reactive Archives - Piotr's TechBlog https://piotrminkowski.com/tag/reactive/ Java, Spring, Kotlin, microservices, Kubernetes, containers Fri, 28 Jul 2023 14:58:24 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 reactive Archives - Piotr's TechBlog https://piotrminkowski.com/tag/reactive/ 32 32 181738725 Reactive Spring Boot with WebFlux, R2DBC and Postgres https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/ https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/#respond Fri, 28 Jul 2023 14:58:22 +0000 https://piotrminkowski.com/?p=14359 In this article, you will learn how to implement and test reactive Spring Boot apps using Spring WebFlux, R2DBC, and Postgres database. We will create two simple apps written in Kotlin using the latest version of Spring Boot 3. Our apps expose some REST endpoints over HTTP. In order to test the communication between them […]

The post Reactive Spring Boot with WebFlux, R2DBC and Postgres appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to implement and test reactive Spring Boot apps using Spring WebFlux, R2DBC, and Postgres database. We will create two simple apps written in Kotlin using the latest version of Spring Boot 3. Our apps expose some REST endpoints over HTTP. In order to test the communication between them and integration with the Postgres database, we will use Testcontainers and Netty Mock Server.

If you are looking for more guides to Spring Boot 3, you can look at other posts on my blog. In that article, I’m describing how to build a microservices architecture with Spring Boot 3 and Spring Cloud. You can also read about the latest changes in observability with Spring Boot 3 and learn how to integrate your app with Grafana Stack in that article. Of course, these are just a few examples – you can find more content in this area on my blog.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. It contains two apps inside employee-service and organization-service directories. After that, you should just follow my instructions.

Dependencies

In the first step, we will add several dependencies related to Kotlin. Besides the standard libraries, we can include Kotlin support for Jackson (JSON serialization/deserialization):

<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-reflect</artifactId>
</dependency>

We also need to include two Spring Boot Starters. In order to create a reactive Spring @Controller, we need to use the Spring WebFlux module. With Spring Boot Data R2DBC Starter we can use Spring Data Repositories in a reactive way. Finally, we have to include the Postgres driver provided by R2DBC.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <scope>runtime</scope>
</dependency>

There are several testing dependencies in our project. We need to include a standard Spring Boot Test Starter, Testcontainers with JUnit 5, Postgres, and R2DBC support, and finally Mock Server Netty module for mocking reactive API. It is also worth adding the spring-boot-testcontainers module to take advantage of built-in integration between Spring Boot and Testcontainers.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>r2dbc</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.mock-server</groupId>
  <artifactId>mockserver-netty</artifactId>
  <version>5.15.0</version>
  <scope>test</scope>
</dependency>

The last dependency is optional. We can include Spring Boot Actuator in our apps. It adds R2DBC connection status to health checks and several metrics with the pool status.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Implement the Spring Reactive App

Here’s our model class for the first app – employee-service:

class Employee(val name: String, 
               val salary: Int, 
               val organizationId: Int) {
    @Id var id: Int? = null
}

Here’s the repository interface. It needs to extend the R2dbcRepository interface. The same as with the standard Spring Data Repositories we can define several find methods. However, instead of the entities they wrap the return objects with the Reactor Mono or Flux.

interface EmployeeRepository: R2dbcRepository<Employee, Int> {
    fun findByOrganizationId(organizationId: Int): Flux<Employee>
}

Here’s the implementation of our @RestController. We need to inject the EmployeeRepository bean. Then we use the repository bean to interact with the database in a reactive way. Our endpoints also return objects wrapped by the Reactor Mono and Flux. There are three find endpoints and a single POST endpoint:

  • Searching all the employees (1)
  • Searching by the employee id (2)
  • Searching all employees by the organization id (3)
  • Adding new employees (4)
@RestController
@RequestMapping("/employees")
class EmployeeController {

   @Autowired
   lateinit var repository : EmployeeRepository

   @GetMapping // (1)
   fun findAll() : Flux<Employee> = repository.findAll()

   @GetMapping("/{id}") // (2)
   fun findById(@PathVariable id: Int) : Mono<Employee> = 
      repository.findById(id)

   @GetMapping("/organization/{organizationId}") // (3)
   fun findByOrganizationId(@PathVariable organizationId: Int): 
      Flux<Employee> = repository.findByOrganizationId(organizationId)

   @PostMapping // (4)
   fun add(@RequestBody employee: Employee) : Mono<Employee> = 
      repository.save(employee)

}

We also need to configure database connection settings in the Spring Boot application.yml:

spring:
  application:
    name: employee-service
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/spring
    username: spring
    password: spring123

Here’s our main class. We want our app to create a table in the database on startup. With R2DBC we need to prepare a fragment of code for populating the schema with the schema.sql file.

@SpringBootApplication
class EmployeeApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

}

fun main(args: Array<String>) {
   runApplication<EmployeeApplication>(*args)
}

Then just place the schema.sql file in the src/main/resources directory.

CREATE TABLE employee (
  id SERIAL PRIMARY KEY, 
  name VARCHAR(255), 
  salary INT, 
  organization_id INT
);

Let’s switch to the organization-service. The implementation is pretty similar. Hore’s our domain model class:

class Organization(var name: String) {
    @Id var id: Int? = null
}

Our app is communicating with the employee-service. Therefore, we need to define the WebClient bean. It gets the address of the target service from application properties.

@SpringBootApplication
class OrganizationApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

   @Value("\${employee.client.url}")
   private lateinit var employeeUrl: String

   @Bean
   fun webClient(builder: WebClient.Builder): WebClient {
      return builder.baseUrl(employeeUrl).build()
   }
}

fun main(args: Array<String>) {
    runApplication<OrganizationApplication>(*args)
}

There is also the repository interface OrganizationRepository. Our @RestController uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service. As the response from the GET /employees/{id}/with-employees it returns the OrganizationDTO.

@RestController
@RequestMapping("/organizations")
class OrganizationController {

   @Autowired
   lateinit var repository : OrganizationRepository
   @Autowired
   lateinit var client : WebClient

   @GetMapping
   fun findAll() : Flux<Organization> = repository.findAll()

   @GetMapping("/{id}")
   fun findById(@PathVariable id : Int): Mono<Organization> = 
      repository.findById(id)

   @GetMapping("/{id}/with-employees")
   fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
      val employees : Flux<Employee> = client.get().uri("/employees/organization/$id")
             .retrieve().bodyToFlux(Employee::class.java)
      val org : Mono<Organization> = repository.findById(id)
      return org.zipWith(employees.collectList()).log()
             .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

   @PostMapping
   fun add(@RequestBody employee: Organization) : Mono<Organization> = 
      repository.save(employee)

}

Here’s the implementation of our DTO:

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList<Employee> = ArrayList()
    constructor(employees: MutableList<Employee>) : this(null, "") {
        this.employees = employees
    }
    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
        this.employees = employees
    }
}

Testing with Integrations

Once we finished the implementation we can prepare several integration tests. As I mentioned at the begging, we will use Testcontainers for running the Postgres container during the tests. Our test runs the app and leverages the auto-configured instance of WebTestClient to call the API endpoints (1). We need to start the Postgres container before the tests. So we need to define the container bean inside the companion object section (2). With the @ServiceConnection annotation we don’t have to manually set the properties – Spring Boot will do it for us (3).

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class EmployeeControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient // (1)

   companion object { // (2)

      @Container
      @ServiceConnection // (3)
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddEmployee() {
      webTestClient.post().uri("/employees")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Employee("Test", 1000, 1))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployee() {
      webTestClient.get().uri("/employees/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployees() {
      webTestClient.get().uri("/employees")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

}

The test class for the organization-service is a little bit more complicated. That’s because we need to mock the communication with the employee-service. In order to do that we use the ClientAndServer object (1). It is started once before all the tests (2) and stopped after the tests (3). We are mocking the GET /employees/organization/{id} endpoint, which is invoked by the organization-service (4). Then we are calling the organization-service GET /organizations/{id}/with-employees endpoint (5). Finally, we are verifying if it returns the list of employees inside the JSON response.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class OrganizationControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient

   companion object {

      @Container
      @ServiceConnection
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

      private var mockServer: ClientAndServer? = null // (1)

      @BeforeAll
      @JvmStatic
      internal fun beforeAll() { // (2)
         mockServer = ClientAndServer.startClientAndServer(8090);
      }

      @AfterAll
      @JvmStatic
      internal fun afterAll() { // (3)
         mockServer!!.stop()
      }
   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddOrganization() {
        webTestClient.post().uri("/organizations")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Organization("Test"))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganization() {
        webTestClient.get().uri("/organizations/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizations() {
        webTestClient.get().uri("/organizations")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizationWithEmployees() { // (4)
        mockServer!!.`when`(request()
               .withMethod("GET")
               .withPath("/employees/organization/1"))
            .respond(response()
                .withStatusCode(200)
                .withContentType(MediaType.APPLICATION_JSON)
                .withBody(createEmployees()))

      webTestClient.get().uri("/organizations/1/with-employees")
          .accept(MediaType.APPLICATION_JSON) // (5)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
          .jsonPath("$.employees.length()").isEqualTo(2)
          .jsonPath("$.employees[0].id").isEqualTo(1)
          .jsonPath("$.employees[1].id").isEqualTo(2)
   }

   private fun createEmployees(): String {
      val employees: List<Employee> = listOf<Employee>(
         Employee(1, "Test1", 10000, 1),
         Employee(2, "Test2", 20000, 1)
      )
      return jacksonObjectMapper().writeValueAsString(employees)
   }
}

You can easily verify that all the tests are finished successfully by running them on your laptop. After cloning the repository you need to run Docker and build the apps with the following Maven command:

$ mvn clean package

We can also prepare the build definition for our apps on CircleCI. Since we need to run Testcontainers, we need a machine with a Docker daemon. Here’s the configuration of a built pipeline for CircleCI inside the .circle/config.yml file:

version: 2.1

jobs:
  build:
    docker:
      - image: 'cimg/openjdk:20.0'
    steps:
      - checkout
      - run:
          name: Analyze on SonarCloud
          command: mvn verify sonar:sonar -DskipTests

executors:
  machine_executor_amd64:
    machine:
      image: ubuntu-2204:2022.04.2
    environment:
      architecture: "amd64"
      platform: "linux/amd64"

orbs:
  maven: circleci/maven@1.4.1

workflows:
  maven_test:
    jobs:
      - maven/test:
          executor: machine_executor_amd64
      - build:
          context: SonarCloud

Here’s the result of the build on CircleCI:

spring-boot-reactive-postgres-circleci

If you have Docker running you can also start our Spring Boot reactive apps with the Postgres container. It is possible thanks to the spring-boot-testcontainers module. There is a dedicated @TestConfiguration class that may be used to run Postgres in dev mode:

@TestConfiguration
class PostgresContainerDevMode {

    @Bean
    @ServiceConnection
    fun postgresql(): PostgreSQLContainer<*>? {
        return PostgreSQLContainer("postgres:14.0")
            .withUsername("spring")
            .withPassword("spring123")
    }
}

Now, we need to define the “test” main class that uses the configuration provided within the PostgresContainerDevMode class.

class EmployeeApplicationTest

fun main(args: Array<String>) {
    fromApplication<EmployeeApplication>()
       .with(PostgresContainerDevMode::class)
       .run(*args)
}

In order to run the app in dev Postgres on Docker just execute the following Maven command:

$ mvn spring-boot:test-run

The post Reactive Spring Boot with WebFlux, R2DBC and Postgres appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/feed/ 0 14359
A Deep Dive Into Spring WebFlux Threading Model https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/ https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/#comments Mon, 30 Mar 2020 11:35:58 +0000 http://piotrminkowski.com/?p=7878 If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks […]

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks is in their threading and concurrency model. Without understanding how reactive framework handles threads, you won’t fully understand reactivity. Let’s take a closer look on the threading model realized by Spring WebFlux and Project Reactor.
We should begin from the following fragment in Spring Framework Reference.

On a “vanilla” Spring WebFlux server (for example, no data access nor other optional dependencies), you can expect one thread for the server and several others for request processing (typically as many as the number of CPU cores).

So, the number of processing threads that handle incoming requests is related to the number of CPU cores. If you have 4 cores and you didn’t enable hyper-threading mechanism you would have 4 worker threads in the pool. You can determine the number of cores available to the JVM by using the static method, availableProcessors from class Runtime: Runtime.getRuntime().availableProcessors().

Example

At that moment, it is worth referring to the example application. It is a simple Spring Boot application that exposes reactive API built using Spring Webflux. The source code is available on GitHub: https://github.com/piomin/sample-spring-webflux.git. You can run it after building using java -jar command, or just simply run it from IDE. It also contains Dockerfile in the root directory, so you can build and run it inside a Docker container.

Limit CPU

With Docker you can easily limit the number of CPU cores “visible” for your application. When running a Docker container you just need to set parameter cpus as shown below.

$ docker run -d --name webflux --cpus="1" -p 8080:8080 piomin/sample-spring-webflux

The sample application is printing the number of available CPU cores during startup. Here’s the fragment of code responsible for it.

@SpringBootApplication
public class SampleSpringWebFluxApp {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxApp.class);

    public static void main(String[] args) {
        SpringApplication.run(SampleSpringWebFluxApp.class, args);
    }

    @PostConstruct
    public void init() {
        LOGGER.info("CPU: {}", Runtime.getRuntime().availableProcessors());
    }

}

Now, we can send some test requests to one of the available endpoints.

$ curl http://192.168.99.100:8080/persons/json

After that we may take a look at the logs generated by application using docker logs -f webflux command. The result is pretty unexpectable. We still have four worker threads in the pool, although the application uses only a single CPU core. Here’s the screen with application logs.

spring-webflux-threading-model-docker

An explanation is pretty simple. Here’s the fragment of Reactor Netty source code that illustrates the algorithm used. The minimum number of worker threads in the pool is 4.

spring-webflux-threading-model-reactor-netty

Well, it exactly explains why my application uses the same number of threads when it is running on my laptop with 4 CPUs, and on the Docker container which limits used CPU to a single core. You should also remember that Java is able to see the number of CPU cores inside a Docker container since version 10. If you use Java 8 on Docker your application sees the CPU on the machine not inside the container, which may have a huge impact on the consumed resources.

WebClient Thread Pool

Let’s consider the following implementation of reactive endpoint. First, we are emitting a stream of three Person objects, and then we are merging it with the stream returned by the WebClient.

@Autowired
WebClient client;

private Stream<Person> prepareStreamPart1() {
   return Stream.of(
      new Person(1, "Name01", "Surname01", 11),
      new Person(2, "Name02", "Surname02", 22),
      new Person(3, "Name03", "Surname03", 33)
   );
}

@GetMapping("/integration/{param}")
public Flux<Person> findPersonsIntegration(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
      );
}

In order to properly understand what exactly happens with thread pool let’s recall the following fragment from Spring Framework Reference.

The reactive WebClient operates in event loop style. So you can see a small, fixed number of processing threads related to that (for example, reactor-http-nio- with the Reactor Netty connector). However, if Reactor Netty is used for both client and server, the two share event loop resources by default.

Typically, you have 4 worker threads, which are shared between both server processing and client-side communication with other resources. Since, we have enabled detailed logging with log method, we may easily analyse step by step thread pooling for the single request sent to the test endpoint. The incoming request is being processed by reactor-http-nio-2 thread. Then WebClient is called and it subscribes to the result stream in the same thread. But the result is published on the different thread reactor-http-nio-3, a first available in the pool.

spring-webflux-threading-model-webclient-logs

We can also analyze the subsequent logs for a single selected thread. In the following fragment of logs for thread reactor-http-nio-1 you may see that WebClient does not block the thread while waiting for a result from a calling resource. The method onComplete is called here for times in row without any onSubscribe call. The idea of non-blocking threads is the main concept around reactive programming, which allows us to handle many requests using relatively small numbers of threads (4 in that case).


15:02:25.875 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.875 --- [ctor-http-nio-1] : request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.891 --- [ctor-http-nio-1] : | request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.892 --- [ctor-http-nio-1] : | onComplete()
15:02:25.894 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.895 --- [ctor-http-nio-1] : request(32)
15:02:25.924 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.925 --- [ctor-http-nio-1] : | request(32)
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.926 --- [ctor-http-nio-1] : | onComplete()
15:02:25.927 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.927 --- [ctor-http-nio-1] : request(32)
15:02:26.239 --- [ctor-http-nio-1] : onNext(Person(id=38483, firstName=Name6, lastName=Surname6, age=50))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=59103, firstName=Name6, lastName=Surname6, age=6))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=67587, firstName=Name6, lastName=Surname6, age=36))
15:02:26.242 --- [ctor-http-nio-1] : onComplete()
15:02:26.283 --- [ctor-http-nio-1] : onNext(Person(id=40006, firstName=Name2, lastName=Surname2, age=41))
15:02:26.284 --- [ctor-http-nio-1] : onNext(Person(id=94727, firstName=Name2, lastName=Surname2, age=79))
15:02:26.287 --- [ctor-http-nio-1] : onNext(Person(id=19891, firstName=Name2, lastName=Surname2, age=84))
15:02:26.287 --- [ctor-http-nio-1] : onComplete()
15:02:26.292 --- [ctor-http-nio-1] : onNext(Person(id=67550, firstName=Name14, lastName=Surname14, age=92))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=85063, firstName=Name14, lastName=Surname14, age=76))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=54572, firstName=Name14, lastName=Surname14, age=18))
15:02:26.293 --- [ctor-http-nio-1] : onComplete()
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=49896, firstName=Name16, lastName=Surname16, age=20))
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=85684, firstName=Name16, lastName=Surname16, age=39))
15:02:26.296 --- [ctor-http-nio-1] : onNext(Person(id=2605, firstName=Name16, lastName=Surname16, age=75))
15:02:26.296 --- [ctor-http-nio-1] : onComplete()
15:02:26.337 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:26.338 --- [ctor-http-nio-1] : | request(32)
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:26.340 --- [ctor-http-nio-1] : | onComplete()

Long response time

If you are using WebClient for communication with other resources you should be able to handle a long response time. Of course WebClient does not block the thread, but sometimes it is desired to use another thread pool than the main worker thread pool shared with the server. To do that you should use publishOn method. Spring WebFlux provides thread pool abstractions called Schedulers. You may use it to create different concurrency strategies. If you prefer to have a full control of the minimum and maximum number of threads in the pool you should define your own task executor as shown below. The minimum size of the pool is 5, while the maximum size is 10.

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
   executor.setCorePoolSize(5);
   executor.setMaxPoolSize(10);
   executor.setQueueCapacity(100);
   executor.setThreadNamePrefix("slow-");
   executor.initialize();
   return executor;
}

Then you just need to set it as a default scheduler for publishOn method. Here’s our second test endpoint that sets a dedicated thread pool for WebClient.

@GetMapping("/integration-in-different-pool/{param}")
public Flux<Person> findPersonsIntegrationInDifferentPool(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
            .publishOn(Schedulers.fromExecutor(taskExecutor))
         );
}

Performance Testing

Now, our main goal is to compare performance of the both implemented endpoints. First of them using the same thread pool for server requests processing and for WebClient, while the second creates separate thread pool for WebClient with publishOn. The first step of preparing such a comparison test is to create a mock with delayed response. We will use MockWebServer provided by okhttp. It caches requests send to /slow/{param} endpoint, emits the stream with Person objects delayed 200 ms.

public static MockWebServer mockBackEnd;

@BeforeClass
public static void setUp() throws IOException {
   final Dispatcher dispatcher = new Dispatcher() {

      @NotNull
      @Override
      public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException {
         String pathParam = recordedRequest.getPath().replaceAll("/slow/", "");
         List personsPart2 = List.of(new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)));
         try {
            return new MockResponse()
               .setResponseCode(200)
               .setBody(mapper.writeValueAsString(personsPart2))
               .setHeader("Content-Type", "application/json")
               .setBodyDelay(200, TimeUnit.MILLISECONDS);
         }
         catch (JsonProcessingException e) {
            e.printStackTrace();
         }
         return null;
      }
   };
   mockBackEnd = new MockWebServer();
   mockBackEnd.setDispatcher(dispatcher);
   mockBackEnd.start();
   System.setProperty("target.uri", "http://localhost:" + mockBackEnd.getPort());
}

@AfterClass
public static void tearDown() throws IOException {
   mockBackEnd.shutdown();
}

The last step in our implementation is to create test methods. For calling both sample endpoints we can use synchronous TestRestTemplate, since it doesn’t impact on the results. For benchmarking I’m using junit-benchmarks library, which allows me to define the number of concurrent threads and maximum number of attempts. To enable that library for JUnit test we just need to define @Rule. Here’s the implementation of our performance test. We are running Spring Boot in test mode and then starting 50 concurrent threads with 300 attempts.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@RunWith(SpringRunner.class)
public class PerformanceSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceSpringWebFluxTest.class);
    private static ObjectMapper mapper = new ObjectMapper();
    private static Random r = new Random();
    private static int i = 0;

    @Rule
    public TestRule benchmarkRun = new BenchmarkRule();
    @Autowired
    TestRestTemplate template;
   
    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformance() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }

    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformanceInDifferentPool() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration-in-different-pool/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }
}

Here are the results. For an endpoint with a default Spring WebFlux threading model assuming a shareable thread pool between server processing and client requests there is ~5.5s for processing all 300 requests. For a method with a separate thread pool for WebClient we have ~2.9s for all 300 requests. In general, the more threads you will use in your thread the ratio of average processing time between first and second model would be changed in favor of the second model (with separate thread pool for client).

spring-webflux-threading-model-testresult1

And the result for separated WebClient thread pool.

spring-webflux-threading-model-testresult2

Using Spring Boot Actuator

Sometimes, you need to use some additional libraries in your WebFlux application. One of them is Spring Boot Actuator, which may be especially required if you have to expose health check or metrics endpoints. To find out how Spring WebFlux is handling threading for Spring Boot Actuator we will profile our application with YourKit. After running the application you should generate some traffic to /actuator/health endpoint. Here’s the screen with threads from YourKit. As you see a worker thread (reactor-http-nio-*) delegates long-running job threads available in bounded elastic pool (boundedElastic-*).

spring-webflux-threading-model-yourkit

Threads from bounded elastic are checking free disk space, that is by default a part of Spring Boot Actuator /health endpoint.

spring-webflux-threading-model-diskspace

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/feed/ 19 7878
Micronaut Tutorial: Reactive https://piotrminkowski.com/2019/11/12/micronaut-tutorial-reactive/ https://piotrminkowski.com/2019/11/12/micronaut-tutorial-reactive/#respond Tue, 12 Nov 2019 10:17:00 +0000 https://piotrminkowski.wordpress.com/?p=7451 This is the fourth part of my tutorial to Micronaut Framework – created after a longer period of time. In this article I’m going to show you some examples of reactive programming on the server and client side. By default, Micronaut support to reactive APIs and streams is built on top of RxJava. If you […]

The post Micronaut Tutorial: Reactive appeared first on Piotr's TechBlog.

]]>
This is the fourth part of my tutorial to Micronaut Framework – created after a longer period of time. In this article I’m going to show you some examples of reactive programming on the server and client side. By default, Micronaut support to reactive APIs and streams is built on top of RxJava. If you are interested in some previous parts of my tutorial and would like to read it before starting with this part you can learn about basics, security and server-side applications here:

Reactive programming is becoming increasingly popular recently. Therefore, all the newly created web frameworks supports it by default. There is no difference for Micronaut. In this part of tutorial you will learn how to:

  • Use RxJava framework with Micronaut on the server and client side
  • Streaming JSON over HTTP
  • Use low-level HTTP client and declarative HTTP for retrieving reactive stream
  • Regulate back pressure on the server and client side
  • Test reactive API with JUnit

1. Dependencies

Support for reactive programming with RxJava is enabled by default on Micronaut. The dependency io.reactivex.rxjava2:rxjava is included together with some core micronaut libraries like micronaut-runtime or micronaut-http-server-netty on the server side and with micronaut-http-client on the client side. So, the set of dependencies is the same as for example from Part 2 of my tutorial, which has been describing an approach to building classic web application using Micronaut Framework. Just to recap, here’s the list of the most important dependencies in this tutorial:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-inject</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-runtime</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-server-netty</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-management</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-client</artifactId>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>io.micronaut.test</groupId>
   <artifactId>micronaut-test-junit5</artifactId>
   <scope>test</scope>
</dependency>

2. Controller

Let’s begin from server-side application and a controller implementation. There are some optional annotation for enabling validation (@Validated) and ignoring security constraints (@Secured(SecurityRule.IS_ANONYMOUS)) described in the previous parts of this tutorial. However, the most important thing in the implementation of controller visible below are the RxJava objects used in the return statements: Single, Maybe and Flowable. The rest of implementation is pretty similar to a standard REST controller.

@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);

    List<Person> persons = new ArrayList<>();

    @Post
    public Single<Person> add(@Body @Valid Person person) {
        person.setId(persons.size() + 1);
        persons.add(person);
        return Single.just(person);
    }

    @Get("/{id}")
    public Maybe<Person> findById(@PathVariable Integer id) {
       return Maybe.just(persons.stream().filter(person -> person.getId().equals(id)).findAny().get());
   }

    @Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStream() {
        return Flowable.fromIterable(persons).doOnNext(person -> LOGGER.info("Server: {}", person));
    }

}

In the implementation of controller visible above I used 3 of 5 available RxJava2 types that can be observed:

  • Single – an observable that emits only one item and then completes. It ensures that one item will be sent, so it’s for non empty outputs.
  • Maybe – works very similar to a Single, but with a particular difference: it can complete without emitting a value. This is useful when we have optional emissions.
  • Flowable – it emits a stream of elements and supports back pressure mechanism

Now, if you understand the meaning of basic observable types in RxJava the example controller should become pretty easy for you. We have three methods: add for adding new element into the list, findById for searching by id that may not return any element and findAllStream that emits all elements as a stream. The last method has to produce application/x-json-stream in order to take an advantage of reactive streams also on the client side. When using that type of content type, events are retrieved continuously by the HTTP client thanks to the Flowable type.

3. Using Low-level Reactive HTTP Client

Micronaut Reactive offers two type of clients for accessing HTTP APIs: low-level client and declarative client. We can choose between HttpClient, RxHttpClient and RxStreamingHttpClient with support for streaming data over HTTP. The recommended way for accessing reference to a client is through injecting it with @Client annotation. However, the most suitable way inside JUnit test is with the create static method of the RxHttpClient, since we may dynamically set port number.
Here’s the implementation of JUnit tests with low-level HTTP client. To read a Single or Maybe we are using method retrieve of RxHttpClient. After subscribing to an observable I’m using using ConcurrentUnit library for handling asynchronous results in the test. For accessing Flowable returned as application/x-json-stream on the server side we need to use RxStreamingHttpClient. It provides method jsonStream, which is dedicated for reading a non-blocking stream of JSON objects.

@MicronautTest
public class PersonReactiveControllerTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);

    @Inject
    EmbeddedServer server;

    @Test
    public void testAdd() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
        RxHttpClient client = RxHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        Single<Person> s = client.retrieve(HttpRequest.POST("/persons/reactive", person), Person.class).firstOrError();
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertNotNull(person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindById() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        RxHttpClient client = RxHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        Maybe<Person> s = client.retrieve(HttpRequest.GET("/persons/reactive/1"), Person.class).firstElement();
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertEquals(1, person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindAllStream() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        client.jsonStream(HttpRequest.GET("/persons/reactive/stream"), Person.class)
                .subscribe(s -> {
                    LOGGER.info("Client: {}", s);
                    waiter.assertNotNull(s);
                    waiter.resume();
                });
        waiter.await(3000, TimeUnit.MILLISECONDS, 9);
    }
   
}

4. Back Pressure

One of the main term related to reactive programming is back pressure. Backpressure is resistance or force opposing the desired flow of data through software. In simple words, if a producer sends more events than a consumer is able to handle in a specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Of course, Micronaut Reactive supports backpressure. Let’s analyze how we may control it for Micronaut application on some simple examples.
We may “somehow” control back pressure on the server-side and also on the client-side. However, the client is not able to regulate emission parameters on the server side due to the TCP transport layer. Ok, now let’s implement additional methods in our sample controller. I’m using fromCallable for emitting elements within Flowable stream. We are producing 9 elements by calling method repeat method. The second newly created method is findAllStreamWithCallableDelayed, which additionally delay each element on the stream 100 milliseconds. In this way, we can control back pressure on the server-side.

@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);

    List<Person> persons = new ArrayList<>();
   
    @Get(value = "/stream/callable", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStreamWithCallable() {
        return Flowable.fromCallable(() -> {
            int r = new Random().nextInt(100);
            Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
            return p;
        }).doOnNext(person -> LOGGER.info("Server: {}", person))
                .repeat(9);
    }

    @Get(value = "/stream/callable/delayed", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStreamWithCallableDelayed() {
        return Flowable.fromCallable(() -> {
            int r = new Random().nextInt(100);
            Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
            return p;
        }).doOnNext(person -> LOGGER.info("Server: {}", person))
                .repeat(9).delay(100, TimeUnit.MILLISECONDS);
    }
   
}

We can also control back pressure on the client-side. However, as I have mentioned before it does not have any effect on the emission on the server-side. Now, let’s consider the following test that verifies the delayed stream on the server-side.

@Test
public void testFindAllStreamDelayed() throws MalformedURLException, TimeoutException, InterruptedException {
   final Waiter waiter = new Waiter();
   RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
   client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable/delayed"), Person.class)
      .subscribe(s -> {
         LOGGER.info("Client: {}", s);
         waiter.assertNotNull(s);
         waiter.resume();
      });
   waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}

Here’s the result of the test. Since the server delay every element 100 milliseconds, the client receives and prints the element just after emission.

micronaut-tut-4-1

For a comparison let’s consider the following test. This time we are implementing our custom Subscriber that requests a single element just after processing the previous one. The following test verifies not delayed stream.

@Test
public void testFindAllStreamWithCallable() throws MalformedURLException, TimeoutException, InterruptedException {
   final Waiter waiter = new Waiter();
   RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
   client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable"), Person.class)
      .subscribe(new Subscriber<Person>() {

         Subscription s;

         @Override
         public void onSubscribe(Subscription subscription) {
            subscription.request(1);
            s = subscription;
         }

         @Override
         public void onNext(Person person) {
            LOGGER.info("Client: {}", person);
            waiter.assertNotNull(person);
            waiter.resume();
            s.request(1);
         }

         @Override
         public void onError(Throwable throwable) {

         }

         @Override
         public void onComplete() {

         }
      });
   waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}

Here’s the result of our test. It finishes succesfully, so the subscriber works properly. However, as you can see the back pressure is controlled just on the client side, since server emits all the elements, and then client retrieves them.

micronaut-tut-4-2

5. Declarative HTTP Client

Besides low-level HTTP client Micronaut Reactive allows to create declarative clients via the @Client annotation. If you are familiar with for example OpenFeign project or you have read the second part of my tutorial to Micronaut Framework you probably understand the concept of declarative client. In fact, we just need to create an interface with similar methods to the methods defined inside the controller and annotate them properly. The client interface should be annotated with @Client as shown below. The interface is placed inside src/test/java since it is used just in JUnit tests.

@Client("/persons/reactive")
public interface PersonReactiveClient {

    @Post
    Single<Person> add(@Body Person person);

    @Get("/{id}")
    Maybe<Person> findById(@PathVariable Integer id);

    @Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
    Flowable<Person> findAllStream();

}

The declarative client need to be injected into the test. Here are the same test methods as implemented for low-level client, but now with declarative client.

@MicronautTest
public class PersonReactiveControllerTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);

    @Inject
    EmbeddedServer server;
    @Inject
    PersonReactiveClient client;

    @Test
    public void testAddDeclarative() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
        Single<Person> s = client.add(person);
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertNotNull(person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindByIdDeclarative() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Maybe<Person> s = client.findById(1);
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertEquals(1, person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindAllStreamDeclarative() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flowable<Person> persons = client.findAllStream();
        persons.subscribe(s -> {
            LOGGER.info("Client: {}", s);
            waiter.assertNotNull(s);
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS, 9);
    }

}

Source Code

We were using the same repository as for two previous parts of my Micronaut tutorial: https://github.com/piomin/sample-micronaut-applications.git.

The post Micronaut Tutorial: Reactive appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/11/12/micronaut-tutorial-reactive/feed/ 0 7451
Reactive programming with Project Reactor https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/ https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/#respond Mon, 22 Oct 2018 14:51:15 +0000 https://piotrminkowski.wordpress.com/?p=6872 If you are building reactive microservices you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some most common scenarios of using reactive streams in microservice-based architecture during inter-service communication. I have already described some aspects related to reactive programming […]

The post Reactive programming with Project Reactor appeared first on Piotr's TechBlog.

]]>
If you are building reactive microservices you would probably have to merge data streams from different source APIs into a single result stream. It inspired me to create this article containing some most common scenarios of using reactive streams in microservice-based architecture during inter-service communication.

I have already described some aspects related to reactive programming with Spring based on Spring WebFlux and Spring Data JDBC projects in the following articles:

Spring Framework supports reactive programming since version 5. That support is built on top of Project Reactor – https://projectreactor.io. Reactor is a fourth-generation Reactive programming library for building non-blocking applications on the JVM based on the Reactive Streams Specification. Working with this library can be difficult at first, especially if you don’t have any experience with reactive streams. Reactive Core gives us two data types that enable us to produce a stream of data: Mono and Flux. With Flux we can emit 0..nelements, while with Mono we can create a stream of 0..1elements. Both those types implement Publisher interface. Both these types are lazy, which means they won’t be executed until you consume it. Therefore, when building reactive APIs it is important not to block the stream. Spring WebFlux doesn’t allow that.

Introduction

The sample project is available on GitHub in repository reactive-playground https://github.com/piomin/reactive-playground.git. It is written in Kotlin. In addition to some Kotlin libraries, only a single dependency that needs to be added in order to use Project Reactor is reactor-core.

<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>3.2.1.RELEASE</version>
</dependency>

I would not like to show you the features of Project Reactor based on simple String objects like in many other articles. Therefore, I have created the following class hierarchy for our tests, that allows us to simulate APIs built for three different domain objects.

reactive-programming-4

Class Organization contains a list of Employee and Department. Each department contains a list of Employee assigned only to the given department inside the organization. Class Employee has properties: organizationId that assigns it to the organization and departmentId that assigns it to the department.

data class Employee(var id: Int, var name: String, var salary: Int) {
    var organizationId: Int? = null
    var departmentId: Int? = null

    constructor(id: Int, name: String, salary: Int, organizationId: Int, departmentId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
        this.departmentId = departmentId
    }

    constructor(id: Int, name: String, salary: Int, organizationId: Int) : this(id, name, salary) {
        this.organizationId = organizationId
    }
}

Here’s the implementation of Department class.

class Department(var id: Int, var name: String, var organizationId: Int) {
    var employees: MutableList<Employee> = ArrayList()

    constructor(id: Int, name: String, organizationId: Int, employees: MutableList<Employee>) : this(id, name, organizationId) {
        this.employees.addAll(employees)
    }

    fun addEmployees(employees: MutableList<Employee>) : Department {
        this.employees.addAll(employees)
        return this
    }

    fun addEmployee(employee: Employee) : Department {
        this.employees.add(employee)
        return this
    }

}

Here’s the implementation of Organization class.

class Organization(var id: Int, var name: String) {
    var employees: MutableList<Employee> = ArrayList()
    var departments: MutableList<Department> = ArrayList()

    constructor(id: Int, name: String, employees: MutableList<Employee>, departments: MutableList<Department>) : this(id, name){
        this.employees.addAll(employees)
        this.departments.addAll(departments)
    }

    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name){
        this.employees.addAll(employees)
    }
}

Scenario 1

We have API methods that return data streams. First of them return Flux emitting employees assigned to the given organization. Second of them just returns Mono with the current organization.

private fun getOrganizationByName(name: String) : Mono<Organization> {
   return Mono.just(Organization(1, name))
}

private fun getEmployeesByOrganization(id: Int) : Flux<Employee> {
   return Flux.just(Employee(1, "Employee1", 1000, id),
                Employee(2, "Employee2", 2000, id))
}

We would like to return the single stream emitting organization that contains a list of employees as shown below.

reactor-scenario-1

Here’s the solution. We use the zipWhen method that waits for the result from source Mono, and then calls the second Mono. Because we can zip only the same stream types (in that case these are Mono) we need to convert Flux<Employee> returned by getEmployeesByOrganization method into Mono<MutableList<Employee>> using collectList function. Thanks to zipWhen we can then combine two Mono streams and create new objects inside map function.

@Test
fun testScenario1() {
   val organization : Mono<Organization> = getOrganizationByName("test")
      .zipWhen { organization ->
         getEmployeesByOrganization(organization.id!!).collectList()
      }
      .map { tuple -> 
         Organization(tuple.t1.id, tuple.t1.name, tuple.t2)
      }
}

Scenario 2

Let’s consider another scenario. Now, we have Flux streams that emit employees and departments. Every employee has property departmentId responsible for assignment to the department.

private fun getDepartments() : Flux<Department> {
    return Flux.just(Department(1, "X", 1),
                     Department(2, "Y", 1))
}

private fun getEmployees() : Flux<Employee> {
    return Flux.just(Employee(1, "Employee1", 1000, 1, 1),
            Employee(2, "Employee2", 2000, 1, 1),
            Employee(3, "Employee3", 1000, 1, 2),
            Employee(4, "Employee4", 2000, 1, 2))
}

The goal is to merge those two streams and return the single Flux stream emitting departments that contains all employees assigned to the given department. Here’s the picture that illustrates the transformation described above.

reactive-programming-reactor-5

We can do that in two ways as shown below. First calls flatMap function on stream with departments. Inside flatMap we zip every single Department with a stream of employees. That stream is then filtered by departmentId and converted into Mono type. Finally, we are creating a Mono type using map function that emits a department containing a list of employees.
The second way groups Flux with employees by departmentId. Then it invokes zipping and mapping functions similar to the previous way.

@Test
fun testScenario2() {
   val departments: Flux<Department> = getDepartments()
      .flatMap { department ->
         Mono.just(department)
            .zipWith(getEmployees().filter { it.departmentId == department.id }.collectList())
            .map { t -> t.t1.addEmployees(t.t2) }
      }

   val departments2: Flux<Department> = getEmployees()
      .groupBy { it.departmentId }
      .flatMap { t -> getDepartments().filter { it.id == t.key() }.elementAt(0)
         .zipWith(t.collectList())
         .map { it.t1.addEmployees(it.t2) }
      }
}

Scenario 3

This scenario is simpler than two previous scenarios. We have two API methods that emit Flux with the same object types. First of them contains list of employees having id, name, salary properties, while the second id, organizationId, departmentId properties.

private fun getEmployeesBasic() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000),
                        Employee(2, "BB", 2000))
}

private fun getEmployeesRelationships() : Flux<Employee> {
   return Flux.just(Employee(1, 1, 1),
              Employee(2, 1, 2))
}

We want to convert it into a single stream emitting employees with a full set of properties. The following picture illustrates the described transformation.

reactive-programming-reactor-scenario-3

In that case the solution is pretty simple. We are zipping two Flux streams using zipWith function, and then map two zipped objects into a single containing the full set of properties.

@Test
fun testScenario3() {
   val employees : Flux<Employee> = getEmployeesBasic()
      .zipWith(getEmployeesRelationships())
      .map { t -> Employee(t.t1.id, t.t1.name, t.t1.salary, t.t2.organizationId!!, t.t2.departmentId!!) }
}

Scenario 4

In this scenario we have two independent Flux streams that emit the same type of objects – Employee.

private fun getEmployeesFirstPart() : Flux<Employee> {
   return Flux.just(Employee(1, "AA", 1000), Employee(3, "BB", 3000))
}

private fun getEmployeesSecondPart() : Flux<Employee> {
   return Flux.just(Employee(2, "CC", 2000), Employee(4, "DD", 4000))
}

We would like to merge those two streams into a single stream ordered by id. The following picture shows that transformation.

reactor-scenario-4

Here’s the solution. We use mergeOrderedWith function with a comparator that compares id. Then we can perform some transformations on every object, but it is only an option that shows the usage on map function.

@Test
fun testScenario4() {
   val persons: Flux<Employee> = getEmployeesFirstPart()
      .mergeOrderedWith(getEmployeesSecondPart(), Comparator { o1, o2 -> o1.id.compareTo(o2.id) })
      .map {
         Employee(it.id, it.name, it.salary, 1, 1)
      }
}

Scenario 5

And the last scenario in this article. We have a single input stream Mono with Organization that contains a list of departments. Each of department inside that list also contains the list of all employees assigned to the given department. Here’s our API method implementation.

private fun getDepartmentsByOrganization(id: Int) : Flux<Department> {
   val dep1 = Department(1, "A", id, mutableListOf(
         Employee(1, "Employee1", 1000, id, 1),
         Employee(2, "Employee2", 2000, id, 1)
      )
   )
   val dep2 = Department(2, "B", id, mutableListOf(
         Employee(3, "Employee3", 1000, id, 2),
         Employee(4, "Employee4", 2000, id, 2)
      )
   )
   return Flux.just(dep1, dep2)
}

The goal is to convert the stream to the same stream Flux with Department, but containing a list of all employees in the department. The following picture visualizes the described transformation.

reactor-scenario-5

Here’s the solution. We invoke flatMapIterable function that converts Flux with Department> into Flux with Employees by returning List of Employee. Then we convert it to Mono and add to the newly created Organization object inside map function.

@Test
fun testScenario5() {
   var organization: Mono<Organization> = getDepartmentsByOrganization(1)
      .flatMapIterable { department -> department.employees }
      .collectList()
      .map { t -> Organization(1, "X", t) }
}

The post Reactive programming with Project Reactor appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/10/22/reactive-programming-with-project-reactor/feed/ 0 6872
Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/ https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/#comments Thu, 18 Oct 2018 07:42:10 +0000 https://piotrminkowski.wordpress.com/?p=6863 There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive […]

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive Microservices with Spring WebFlux and Spring Cloud. Spring 5 has also introduced some projects supporting reactive access to NoSQL databases like Cassandra, MongoDB or Couchbase. But there were still a lack in support for reactive to access to relational databases. The change is coming together with R2DBC (Reactive Relational Database Connectivity) project. That project is also being developed by Pivotal members. It seems to be very interesting initiative, however it is rather at the beginning of the road. Anyway, there is a module for integration with Postgres, and we will use it for our demo application. R2DBC will not be the only one new interesting solution described in this article. I also show you how to use Spring Data JDBC – another really interesting project released recently.
It is worth mentioning some words about Spring Data JDBC. This project has been already released, and is available under version 1.0. It is a part of a bigger Spring Data framework. It offers a repository abstraction based on JDBC. The main reason for creating that library is to allow access to relational databases using Spring Data way (through CrudRepository interfaces) without including JPA library to the application dependencies. Of course, JPA is still certainly the main persistence API used for Java applications. Spring Data JDBC aims to be much simpler conceptually than JPA by not implementing popular patterns like lazy loading, caching, dirty context, sessions. It also provides only very limited support for annotation-based mapping. Finally, it provides an implementation of reactive repositories that uses R2DBC for accessing a relational database. Although that module is still under development (only SNAPSHOT version is available), we will try to use it in our demo application. Let’s proceed to the implementation.

Including dependencies

We use Kotlin for implementation. So first, we include some required Kotlin dependencies.

<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-stdlib</artifactId>
   <version>${kotlin.version}</version>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.module</groupId>
   <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-test-junit</artifactId>
   <version>${kotlin.version}</version>
   <scope>test</scope>
</dependency>

We should also add kotlin-maven-plugin with support for Spring.


<plugin>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-maven-plugin</artifactId>
   <version>${kotlin.version}</version>
   <executions>
      <execution>
         <id>compile</id>
         <phase>compile</phase>
         <goals>
            <goal>compile</goal>
         </goals>
      </execution>
      <execution>
         <id>test-compile</id>
         <phase>test-compile</phase>
         <goals>
            <goal>test-compile</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <args>
         <arg>-Xjsr305=strict</arg>
      </args>
      <compilerPlugins>
         <plugin>spring</plugin>
      </compilerPlugins>
   </configuration>
</plugin>

Then, we may proceed to including frameworks required for the demo implementation. We need to include the special SNAPSHOT version of Spring Data JDBC dedicated for accessing databases using R2DBC. We also have to add some R2DBC libraries and Spring WebFlux. As you may see below only Spring WebFlux is available in stable version (as a part of Spring Boot RELEASE).

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.data</groupId>
   <artifactId>spring-data-jdbc</artifactId>
   <version>1.0.0.r2dbc-SNAPSHOT</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-spi</artifactId>
   <version>1.0.0.M5</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-postgresql</artifactId>
   <version>1.0.0.M5</version>
</dependency>

It is also important to set dependency management for Spring Data project.


<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.data</groupId>
         <artifactId>spring-data-releasetrain</artifactId>
         <version>Lovelace-RELEASE</version>
         <scope>import</scope>
         <type>pom</type>
      </dependency>
   </dependencies>
</dependencyManagement>

Repositories

We are using the well known Spring Data style of CRUD repository implementation. In that case we need to create an interface that extends ReactiveCrudRepository interface.
Here’s the implementation of a repository for managing Employee objects.

interface EmployeeRepository : ReactiveCrudRepository<Employee, Int> {
    @Query("select id, name, salary, organization_id from employee e where e.organization_id = $1")
    fun findByOrganizationId(organizationId: Int) : Flux<Employee>
}

Here’s another implementation of repository – this time for managing Organization objects.

interface OrganizationRepository : ReactiveCrudRepository<Organization, Int> 

Implementing Entities and DTOs

Kotlin provides a convenient way of creating entity class by declaring them as data class. When using Spring Data JDBC we have to set the primary key for the entity by annotating the field with @Id. It assumes the key is automatically incremented by the database. If you are not using auto-increment columns, you have to use a BeforeSaveEvent listener, which sets the ID of the entity. However, I tried to set such a listener for my entity, but it just didn’t work with the reactive version of Spring Data JDBC.
Here’s an implementation of Employee entity class. What is worth mentioning Spring Data JDBC will automatically map class field organizationId into database column organization_id.

data class Employee(val name: String, val salary: Int, val organizationId: Int) {
    @Id 
    var id: Int? = null
}

Here’s an implementation of Organization entity class.

data class Organization(var name: String) {
    @Id 
    var id: Int? = null
}

R2DBC does not support any lists or sets. Because I’d like to return a list with employees inside Organization object in one of API endpoints I have created a DTO containing such a list as shown below.

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList = ArrayList()
    constructor(employees: MutableList) : this(null, "") {
        this.employees = employees
    }
}

The SQL scripts corresponding to the created entities are visible below. Field type serial will automatically create a sequence and attach it to the field id.

CREATE TABLE employee (
    name character varying NOT NULL,
    salary integer NOT NULL,
    id serial PRIMARY KEY,
    organization_id integer
);
CREATE TABLE organization (
    name character varying NOT NULL,
    id serial PRIMARY KEY
);

Building sample web applications

For the demo purposes we will build two independent applications employee-service and organization-service. Application organization-service is communicating with employee-service using WebFlux WebClient. It gets the list of employees assigned to the organization, and includes them to response together with Organization object. Sample applications source code is available on GitHub under repository sample-spring-data-webflux: https://github.com/piomin/sample-spring-data-webflux.
Ok, let’s begin from declaring Spring Boot main class. We need to enable Spring Data JDBC repositories by annotating the main class with @EnableJdbcRepositories.

@SpringBootApplication
@EnableJdbcRepositories
class EmployeeApplication

fun main(args: Array<String>) {
    runApplication<EmployeeApplication>(*args)
}

Working with R2DBC and Postgres requires some configuration. Probably due to an early stage of progress in development of Spring Data JDBC and R2DBC there is no Spring Boot auto-configuration for Postgres. We need to declare connection factory, client, and repository inside @Configuration bean.

@Configuration
class EmployeeConfiguration {

    @Bean
    fun repository(factory: R2dbcRepositoryFactory): EmployeeRepository {
        return factory.getRepository(EmployeeRepository::class.java)
    }

    @Bean
    fun factory(client: DatabaseClient): R2dbcRepositoryFactory {
        val context = RelationalMappingContext()
        context.afterPropertiesSet()
        return R2dbcRepositoryFactory(client, context)
    }

    @Bean
    fun databaseClient(factory: ConnectionFactory): DatabaseClient {
        return DatabaseClient.builder().connectionFactory(factory).build()
    }

    @Bean
    fun connectionFactory(): PostgresqlConnectionFactory {
        val config = PostgresqlConnectionConfiguration.builder() //
                .host("192.168.99.100") //
                .port(5432) //
                .database("reactive") //
                .username("reactive") //
                .password("reactive123") //
                .build()

        return PostgresqlConnectionFactory(config)
    }

}

Finally, we can create REST controllers that contain the definition of our reactive API methods. With Kotlin it does not take much space. The following controller definition contains three GET methods that allows to find all employees, all employees assigned to a given organization or a single employee by id.

@RestController
@RequestMapping("/employees")
class EmployeeController {

    @Autowired
    lateinit var repository : EmployeeRepository

    @GetMapping
    fun findAll() : Flux<Employee> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Employee> = repository.findById(id)

    @GetMapping("/organization/{organizationId}")
    fun findByorganizationId(@PathVariable organizationId : Int) : Flux<Employee> = repository.findByOrganizationId(organizationId)

    @PostMapping
    fun add(@RequestBody employee: Employee) : Mono<Employee> = repository.save(employee)

}

Inter-service Communication

For the OrganizationController the implementation is a little bit more complicated. Because organization-service is communicating with employee-service, we first need to declare reactive WebFlux WebClient builder.

@Bean
fun clientBuilder() : WebClient.Builder {
   return WebClient.builder()
}

Then, similar to the repository bean the builder is being injected into the controller. It is used inside findByIdWithEmployees method for calling method GET /employees/organization/{organizationId} exposed by employee-service. As you can see on the code fragment below it provides a reactive API and returns Flux object containing a list of found employees. This list is injected into OrganizationDTO object using zipWith Reactor method.

@RestController
@RequestMapping("/organizations")
class OrganizationController {

    @Autowired
    lateinit var repository : OrganizationRepository
    @Autowired
    lateinit var clientBuilder : WebClient.Builder

    @GetMapping
    fun findAll() : Flux<Organization> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Organization> = repository.findById(id)

    @GetMapping("/{id}/withEmployees")
    fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
        val employees : Flux<Employee> = clientBuilder.build().get().uri("http://localhost:8090/employees/organization/$id")
                .retrieve().bodyToFlux(Employee::class.java)
        val org : Mono = repository.findById(id)
        return org.zipWith(employees.collectList())
                .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

    @PostMapping
    fun add(@RequestBody employee: Organization) : Mono<Organization> = repository.save(employee)

}

How it works?

Before running the tests we need to start a Postgres database. Here’s the Docker command used for running a Postgres container. It is creating a user with a password, and setting up a default database.

$ docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=reactive -e POSTGRES_PASSWORD=reactive123 -e POSTGRES_DB=reactive postgres

Then we need to create some test tables, so you have to run a SQL script placed in the section Implementing Entities and DTOs. After that you can start our test applications. If you do not override default settings provided inside application.yml files employee-service is listening on port 8090, and organization-service on port 8095. The following picture illustrates the architecture of our sample system.
spring-data-1
Now, let’s add some test data using the reactive API exposed by the applications.

$ curl -d '{"name":"Test1"}' -H "Content-Type: application/json" -X POST http://localhost:8095/organizations
$ curl -d '{"name":"Name1", "balance":5000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees
$ curl -d '{"name":"Name2", "balance":10000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees

Finally you can call GET organizations/{id}/withEmployees method, for example using your web browser. The result should be similar to the result visible on the following picture.

spring-data-2

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/feed/ 3 6863
Reactive Microservices with Spring WebFlux and Spring Cloud https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/ https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/#comments Fri, 04 May 2018 09:32:45 +0000 https://piotrminkowski.wordpress.com/?p=6475 I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try […]

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try to put our reactive microservices inside Spring Cloud ecosystem, which contains such the elements like service discovery with Eureka, load balancing with Spring Cloud Commons @LoadBalanced, and API gateway using Spring Cloud Gateway (also based on WebFlux and Netty). We will also check out Spring reactive support for NoSQL databases by the example of Spring Data Reactive Mongo project.

Here’s the figure that illustrates an architecture of our sample system consisting of two microservices, discovery server, gateway and MongoDB databases. The source code is as usual available on GitHub in sample-spring-cloud-webflux repository.

reactive-1

Let’s describe the further steps on the way to create the system illustrated above.

Step 1. Building reactive application using Spring WebFlux

To enable library Spring WebFlux for the project we should include starter spring-boot-starter-webflux to the dependencies. It includes some dependent libraries like Reactor or Netty server.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

REST controller looks pretty similar to the controller defined for synchronous web services. The only difference is in type of returned objects. Instead of a single object we return an instance of class Mono, and instead of list we return instance of class Flux. Thanks to Spring Data Reactive Mongo we don’t have to do anything more that call the needed method on the repository bean.


@RestController
public class AccountController {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountController.class);

   @Autowired
   private AccountRepository repository;

   @GetMapping("/customer/{customer}")
   public Flux findByCustomer(@PathVariable("customer") String customerId) {
      LOGGER.info("findByCustomer: customerId={}", customerId);
      return repository.findByCustomerId(customerId);
   }

   @GetMapping
   public Flux findAll() {
      LOGGER.info("findAll");
      return repository.findAll();
   }

   @GetMapping("/{id}")
   public Mono findById(@PathVariable("id") String id) {
   LOGGER.info("findById: id={}", id);
      return repository.findById(id);
   }

   @PostMapping
   public Mono create(@RequestBody Account account) {
      LOGGER.info("create: {}", account);
      return repository.save(account);
   }

}

Step 2. Integrate an application with database using Spring Data Reactive Mongo

The implementation of integration between application and database is also very simple. First, we need to include starter spring-boot-starter-data-mongodb-reactive to the project dependencies.

<dependency>
  <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

The support for reactive Mongo repositories is automatically enabled after including the starter. The next step is to declare entity with ORM mappings. The following class is also returned as reponse by AccountController.

@Document
public class Account {

   @Id
   private String id;
   private String number;
   private String customerId;
  private int amount;

   ...

}

Finally, we may create a repository interface that extends ReactiveCrudRepository. It follows the patterns implemented by Spring Data JPA and provides some basic methods for CRUD operations. It also allows you to define methods with names, which are automatically mapped to queries. The only difference in comparison with standard Spring Data JPA repositories is in method signatures. The objects are wrapped by Mono and Flux.

public interface AccountRepository extends ReactiveCrudRepository {

   Flux findByCustomerId(String customerId);

}

In this example I used Docker container for running MongoDB locally. Because I run Docker on Windows using Docker Toolkit the default address of Docker machine is 192.168.99.100. Here’s the configuration of data source in application.yml file.

spring:
  data:
    mongodb:
      uri: mongodb://192.168.99.100/test

Step 3. Enabling service discovery using Eureka

Integration with Spring Cloud Eureka is pretty the same as for synchronous REST microservices. To enable discovery client we should first include starter spring-cloud-starter-netflix-eureka-client to the project dependencies.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

Then we have to enable it using @EnableDiscoveryClient annotation.

@SpringBootApplication
@EnableDiscoveryClient
public class AccountApplication {

   public static void main(String[] args) {
      SpringApplication.run(AccountApplication.class, args);
   }

}

Microservice will automatically register itself in Eureka. Of course, we may run more than one instance of every service. Here’s the screen illustrating Eureka Dashboard (http://localhost:8761) after running two instances of account-service and a single instance of customer-service.  I would not like to go into the details of running application with embedded Eureka server. You may refer to my previous article for details: Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. Eureka server is available as discovery-service module.

spring-reactive

Step 4. Inter-service communication between reactive microservices with WebClient

An inter-service communication is realized by the WebClient from Spring WebFlux project. The same as for RestTemplate you should annotate it with Spring Cloud Commons @LoadBalanced . It enables integration with service discovery and load balancing using Netflix OSS Ribbon client. So, the first step is to declare a client builder bean with @LoadBalanced annotation.

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
   return WebClient.builder();
}

Then we may inject WebClientBuilder into the REST controller. Communication with account-service is implemented inside GET /{id}/with-accounts , where first we are searching for customer entity using reactive Spring Data repository. It returns object Mono , while the WebClient returns Flux . Now, our main goal is to merge those to publishers and return single Mono object with the list of accounts taken from Flux without blocking the stream. The following fragment of code illustrates how I used WebClient to communicate with other microservice, and then merge the response and result from repository to single Mono object. This merge may probably be done in more “elegant” way, so feel free to create push request with your proposal.

@Autowired
private WebClient.Builder webClientBuilder;

@GetMapping("/{id}/with-accounts")
public Mono findByIdWithAccounts(@PathVariable("id") String id) {
   LOGGER.info("findByIdWithAccounts: id={}", id);
   Flux accounts = webClientBuilder.build().get().uri("http://account-service/customer/{customer}", id).retrieve().bodyToFlux(Account.class);
   return accounts
      .collectList()
      .map(a -> new Customer(a))
      .mergeWith(repository.findById(id))
      .collectList()
      .map(CustomerMapper::map);
}

Step 5. Building API gateway using Spring Cloud Gateway

Spring Cloud Gateway is one of the newest Spring Cloud projects. It is built on top of Spring WebFlux, and thanks to that we may use it as a gateway to our sample system based on reactive microservices with Spring Boot. Similar to Spring WebFlux applications it is run on an embedded Netty server. To enable it for the Spring Boot application just include the following dependency to your project.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

We should also enable a discovery client in order to allow the gateway to fetch a list of registered microservices. However, there is no need to register a gateway application in Eureka. To disable registration you may set property eureka.client.registerWithEureka to false inside application.yml file.

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayApplication {

   public static void main(String[] args) {
      SpringApplication.run(GatewayApplication.class, args);
   }

}

By default, Spring Cloud Gateway does not enable integration with service discovery. To enable it we should set property spring.cloud.gateway.discovery.locator.enabled to true. Now, the last thing that should be done is the configuration of the routes. Spring Cloud Gateway provides two types of components that may be configured inside routes: filters and predicates. Predicates are used for matching HTTP requests with the route, while filters can be used to modify requests and responses before or after sending the downstream request. Here’s the full configuration of gateway. It enables service discovery location, and defines two routes based on entries in service registry. We use the Path Route Predicate factory for matching the incoming requests, and the RewritePath GatewayFilter factory for modifying the requested path to adapt it to the format exposed by the downstream services (endpoints are exposed under path /, while gateway expose them under paths /account and /customer).

spring:
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
      - id: account-service
        uri: lb://account-service
        predicates:
        - Path=/account/**
        filters:
        - RewritePath=/account/(?.*), /$\{path}
      - id: customer-service
        uri: lb://customer-service
        predicates:
        - Path=/customer/**
        filters:
        - RewritePath=/customer/(?.*), /$\{path}

Step 6. Testing reactive microservices with Spring Boot

Before making some tests let’s just recap our sample system. We have two microservices account-service, customer-service that use MongoDB as a database. Microservice customer-service calls endpoint GET /customer/{customer} exposed by account-service. The URL of account-service is taken from Eureka. The whole sample system is hidden behind the gateway, which is available under address localhost:8090.
Now, the first step is to run MongoDB on a Docker container. After executing the following command Mongo is available under address 192.168.99.100:27017.

$ docker run -d --name mongo -p 27017:27017 mongo

Then we may proceed to running discovery-service. Eureka is available under its default address localhost:8761. You may run it using your IDE or just by executing command java -jar target/discovery-service-1.0-SNAPHOT.jar. The same rule applies to our sample microservices. However, account-service needs to be multiplied in two instances, so you need to override default HTTP port when running second instance using -Dserver.port VM argument, for example java -jar -Dserver.port=2223 target/account-service-1.0-SNAPSHOT.jar. Finally, after running gateway-service we may add some test data.

$ curl --header "Content-Type: application/json" --request POST --data '{"firstName": "John","lastName": "Scott","age": 30}' http://localhost:8090/customer
{"id": "5aec1debfa656c0b38b952b4","firstName": "John","lastName": "Scott","age": 30,"accounts": null}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567890","amount": 5000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e86fa656c11d4c655fb","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 5000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567891","amount": 12000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e91fa656c11d4c655fc","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 12000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567892","amount": 2000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e99fa656c11d4c655fd","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 2000}

To test inter-service communication just call endpoint GET /customer/{id}/with-accounts on gateway-service. It forwards the request to customer-service, and then customer-service calls endpoint exposed by account-service using reactive WebClient. The result is visible below.

reactive-2

Conclusion

Since Spring 5 and Spring Boot 2.0 there is a full range of available ways to build microservices-based architecture. We can build standard synchronous system using one-to-one communication with Spring Cloud Netflix project, messaging microservices based on message broker and publish/subscribe communication model with Spring Cloud Stream, and finally asynchronous, reactive microservices with Spring WebFlux. The main goal of this article is to show you how to use Spring WebFlux together with Spring Cloud projects in order to provide such mechanisms like service discovery, load balancing or API gateway for reactive microservices built on top of Spring Boot. Before Spring 5 the lack of support for reactive microservices Spring Boot support was one of the drawbacks of Spring framework, but now with Spring WebFlux it is no longer the case. Not only that, we may leverage Spring reactive support for the most popular NoSQL databases like MongoDB or Cassandra, and easily place our reactive microservices inside one system together with synchronous REST microservices.

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/feed/ 4 6475