Spring WebFlux Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-webflux/ Java, Spring, Kotlin, microservices, Kubernetes, containers Fri, 28 Jul 2023 14:58:24 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 Spring WebFlux Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-webflux/ 32 32 181738725 Reactive Spring Boot with WebFlux, R2DBC and Postgres https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/ https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/#respond Fri, 28 Jul 2023 14:58:22 +0000 https://piotrminkowski.com/?p=14359 In this article, you will learn how to implement and test reactive Spring Boot apps using Spring WebFlux, R2DBC, and Postgres database. We will create two simple apps written in Kotlin using the latest version of Spring Boot 3. Our apps expose some REST endpoints over HTTP. In order to test the communication between them […]

The post Reactive Spring Boot with WebFlux, R2DBC and Postgres appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to implement and test reactive Spring Boot apps using Spring WebFlux, R2DBC, and Postgres database. We will create two simple apps written in Kotlin using the latest version of Spring Boot 3. Our apps expose some REST endpoints over HTTP. In order to test the communication between them and integration with the Postgres database, we will use Testcontainers and Netty Mock Server.

If you are looking for more guides to Spring Boot 3, you can look at other posts on my blog. In that article, I’m describing how to build a microservices architecture with Spring Boot 3 and Spring Cloud. You can also read about the latest changes in observability with Spring Boot 3 and learn how to integrate your app with Grafana Stack in that article. Of course, these are just a few examples – you can find more content in this area on my blog.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. It contains two apps inside employee-service and organization-service directories. After that, you should just follow my instructions.

Dependencies

In the first step, we will add several dependencies related to Kotlin. Besides the standard libraries, we can include Kotlin support for Jackson (JSON serialization/deserialization):

<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-reflect</artifactId>
</dependency>

We also need to include two Spring Boot Starters. In order to create a reactive Spring @Controller, we need to use the Spring WebFlux module. With Spring Boot Data R2DBC Starter we can use Spring Data Repositories in a reactive way. Finally, we have to include the Postgres driver provided by R2DBC.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <scope>runtime</scope>
</dependency>

There are several testing dependencies in our project. We need to include a standard Spring Boot Test Starter, Testcontainers with JUnit 5, Postgres, and R2DBC support, and finally Mock Server Netty module for mocking reactive API. It is also worth adding the spring-boot-testcontainers module to take advantage of built-in integration between Spring Boot and Testcontainers.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>r2dbc</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.mock-server</groupId>
  <artifactId>mockserver-netty</artifactId>
  <version>5.15.0</version>
  <scope>test</scope>
</dependency>

The last dependency is optional. We can include Spring Boot Actuator in our apps. It adds R2DBC connection status to health checks and several metrics with the pool status.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Implement the Spring Reactive App

Here’s our model class for the first app – employee-service:

class Employee(val name: String, 
               val salary: Int, 
               val organizationId: Int) {
    @Id var id: Int? = null
}

Here’s the repository interface. It needs to extend the R2dbcRepository interface. The same as with the standard Spring Data Repositories we can define several find methods. However, instead of the entities they wrap the return objects with the Reactor Mono or Flux.

interface EmployeeRepository: R2dbcRepository<Employee, Int> {
    fun findByOrganizationId(organizationId: Int): Flux<Employee>
}

Here’s the implementation of our @RestController. We need to inject the EmployeeRepository bean. Then we use the repository bean to interact with the database in a reactive way. Our endpoints also return objects wrapped by the Reactor Mono and Flux. There are three find endpoints and a single POST endpoint:

  • Searching all the employees (1)
  • Searching by the employee id (2)
  • Searching all employees by the organization id (3)
  • Adding new employees (4)
@RestController
@RequestMapping("/employees")
class EmployeeController {

   @Autowired
   lateinit var repository : EmployeeRepository

   @GetMapping // (1)
   fun findAll() : Flux<Employee> = repository.findAll()

   @GetMapping("/{id}") // (2)
   fun findById(@PathVariable id: Int) : Mono<Employee> = 
      repository.findById(id)

   @GetMapping("/organization/{organizationId}") // (3)
   fun findByOrganizationId(@PathVariable organizationId: Int): 
      Flux<Employee> = repository.findByOrganizationId(organizationId)

   @PostMapping // (4)
   fun add(@RequestBody employee: Employee) : Mono<Employee> = 
      repository.save(employee)

}

We also need to configure database connection settings in the Spring Boot application.yml:

spring:
  application:
    name: employee-service
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/spring
    username: spring
    password: spring123

Here’s our main class. We want our app to create a table in the database on startup. With R2DBC we need to prepare a fragment of code for populating the schema with the schema.sql file.

@SpringBootApplication
class EmployeeApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

}

fun main(args: Array<String>) {
   runApplication<EmployeeApplication>(*args)
}

Then just place the schema.sql file in the src/main/resources directory.

CREATE TABLE employee (
  id SERIAL PRIMARY KEY, 
  name VARCHAR(255), 
  salary INT, 
  organization_id INT
);

Let’s switch to the organization-service. The implementation is pretty similar. Hore’s our domain model class:

class Organization(var name: String) {
    @Id var id: Int? = null
}

Our app is communicating with the employee-service. Therefore, we need to define the WebClient bean. It gets the address of the target service from application properties.

@SpringBootApplication
class OrganizationApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

   @Value("\${employee.client.url}")
   private lateinit var employeeUrl: String

   @Bean
   fun webClient(builder: WebClient.Builder): WebClient {
      return builder.baseUrl(employeeUrl).build()
   }
}

fun main(args: Array<String>) {
    runApplication<OrganizationApplication>(*args)
}

There is also the repository interface OrganizationRepository. Our @RestController uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service. As the response from the GET /employees/{id}/with-employees it returns the OrganizationDTO.

@RestController
@RequestMapping("/organizations")
class OrganizationController {

   @Autowired
   lateinit var repository : OrganizationRepository
   @Autowired
   lateinit var client : WebClient

   @GetMapping
   fun findAll() : Flux<Organization> = repository.findAll()

   @GetMapping("/{id}")
   fun findById(@PathVariable id : Int): Mono<Organization> = 
      repository.findById(id)

   @GetMapping("/{id}/with-employees")
   fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
      val employees : Flux<Employee> = client.get().uri("/employees/organization/$id")
             .retrieve().bodyToFlux(Employee::class.java)
      val org : Mono<Organization> = repository.findById(id)
      return org.zipWith(employees.collectList()).log()
             .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

   @PostMapping
   fun add(@RequestBody employee: Organization) : Mono<Organization> = 
      repository.save(employee)

}

Here’s the implementation of our DTO:

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList<Employee> = ArrayList()
    constructor(employees: MutableList<Employee>) : this(null, "") {
        this.employees = employees
    }
    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
        this.employees = employees
    }
}

Testing with Integrations

Once we finished the implementation we can prepare several integration tests. As I mentioned at the begging, we will use Testcontainers for running the Postgres container during the tests. Our test runs the app and leverages the auto-configured instance of WebTestClient to call the API endpoints (1). We need to start the Postgres container before the tests. So we need to define the container bean inside the companion object section (2). With the @ServiceConnection annotation we don’t have to manually set the properties – Spring Boot will do it for us (3).

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class EmployeeControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient // (1)

   companion object { // (2)

      @Container
      @ServiceConnection // (3)
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddEmployee() {
      webTestClient.post().uri("/employees")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Employee("Test", 1000, 1))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployee() {
      webTestClient.get().uri("/employees/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployees() {
      webTestClient.get().uri("/employees")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

}

The test class for the organization-service is a little bit more complicated. That’s because we need to mock the communication with the employee-service. In order to do that we use the ClientAndServer object (1). It is started once before all the tests (2) and stopped after the tests (3). We are mocking the GET /employees/organization/{id} endpoint, which is invoked by the organization-service (4). Then we are calling the organization-service GET /organizations/{id}/with-employees endpoint (5). Finally, we are verifying if it returns the list of employees inside the JSON response.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class OrganizationControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient

   companion object {

      @Container
      @ServiceConnection
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

      private var mockServer: ClientAndServer? = null // (1)

      @BeforeAll
      @JvmStatic
      internal fun beforeAll() { // (2)
         mockServer = ClientAndServer.startClientAndServer(8090);
      }

      @AfterAll
      @JvmStatic
      internal fun afterAll() { // (3)
         mockServer!!.stop()
      }
   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddOrganization() {
        webTestClient.post().uri("/organizations")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Organization("Test"))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganization() {
        webTestClient.get().uri("/organizations/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizations() {
        webTestClient.get().uri("/organizations")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizationWithEmployees() { // (4)
        mockServer!!.`when`(request()
               .withMethod("GET")
               .withPath("/employees/organization/1"))
            .respond(response()
                .withStatusCode(200)
                .withContentType(MediaType.APPLICATION_JSON)
                .withBody(createEmployees()))

      webTestClient.get().uri("/organizations/1/with-employees")
          .accept(MediaType.APPLICATION_JSON) // (5)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
          .jsonPath("$.employees.length()").isEqualTo(2)
          .jsonPath("$.employees[0].id").isEqualTo(1)
          .jsonPath("$.employees[1].id").isEqualTo(2)
   }

   private fun createEmployees(): String {
      val employees: List<Employee> = listOf<Employee>(
         Employee(1, "Test1", 10000, 1),
         Employee(2, "Test2", 20000, 1)
      )
      return jacksonObjectMapper().writeValueAsString(employees)
   }
}

You can easily verify that all the tests are finished successfully by running them on your laptop. After cloning the repository you need to run Docker and build the apps with the following Maven command:

$ mvn clean package

We can also prepare the build definition for our apps on CircleCI. Since we need to run Testcontainers, we need a machine with a Docker daemon. Here’s the configuration of a built pipeline for CircleCI inside the .circle/config.yml file:

version: 2.1

jobs:
  build:
    docker:
      - image: 'cimg/openjdk:20.0'
    steps:
      - checkout
      - run:
          name: Analyze on SonarCloud
          command: mvn verify sonar:sonar -DskipTests

executors:
  machine_executor_amd64:
    machine:
      image: ubuntu-2204:2022.04.2
    environment:
      architecture: "amd64"
      platform: "linux/amd64"

orbs:
  maven: circleci/maven@1.4.1

workflows:
  maven_test:
    jobs:
      - maven/test:
          executor: machine_executor_amd64
      - build:
          context: SonarCloud

Here’s the result of the build on CircleCI:

spring-boot-reactive-postgres-circleci

If you have Docker running you can also start our Spring Boot reactive apps with the Postgres container. It is possible thanks to the spring-boot-testcontainers module. There is a dedicated @TestConfiguration class that may be used to run Postgres in dev mode:

@TestConfiguration
class PostgresContainerDevMode {

    @Bean
    @ServiceConnection
    fun postgresql(): PostgreSQLContainer<*>? {
        return PostgreSQLContainer("postgres:14.0")
            .withUsername("spring")
            .withPassword("spring123")
    }
}

Now, we need to define the “test” main class that uses the configuration provided within the PostgresContainerDevMode class.

class EmployeeApplicationTest

fun main(args: Array<String>) {
    fromApplication<EmployeeApplication>()
       .with(PostgresContainerDevMode::class)
       .run(*args)
}

In order to run the app in dev Postgres on Docker just execute the following Maven command:

$ mvn spring-boot:test-run

The post Reactive Spring Boot with WebFlux, R2DBC and Postgres appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/feed/ 0 14359
SSL with Spring WebFlux and Vault PKI https://piotrminkowski.com/2021/05/24/ssl-with-spring-webflux-and-vault-pki/ https://piotrminkowski.com/2021/05/24/ssl-with-spring-webflux-and-vault-pki/#respond Mon, 24 May 2021 06:40:59 +0000 https://piotrminkowski.com/?p=9745 In this article, you will learn how to configure the Vault PKI engine and integrate it with Spring WebFlux. With Vault PKI you can easily generate X.509 certificates signed by the CA. Then your application may get a certificate through a REST API. Its TTL is relatively short. It is unique per each application instance. […]

The post SSL with Spring WebFlux and Vault PKI appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to configure the Vault PKI engine and integrate it with Spring WebFlux. With Vault PKI you can easily generate X.509 certificates signed by the CA. Then your application may get a certificate through a REST API. Its TTL is relatively short. It is unique per each application instance. Also, we can use Spring VaultTemplate to simplify integration with Vault API.

Let’s say a little bit more about Vault. It allows you to secure, store, and control access to tokens, passwords, certificates, and encryption keys using UI, CLI, or HTTP API. It is a really powerful tool. With Vault, instead of a traditional approach, you can manage your security in a more dynamic, cloud-native way. For example, you can integrate Vault with a database backend, and then generate user login and password on the fly. Moreover, for Spring Boot applications you can take an advantage of the Spring Cloud Vault project. If you are interested in more information about it read my article Testing Spring Boot Integration with Vault and Postgres using Testcontainers.

That’s not all. You can integrate Vault with other tools from Hashicorp like Consul or Nomad. In other words, it allows us to build a cloud-native platform in a secure way. For more details please refer to the article Secure Spring Cloud Microservices with Vault and Nomad.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my repository sample-spring-cloud-security. Then you should go to the gateway-service directory, and just follow my instructions in the next sections. The sample application acts as an API gateway for microservices. We use Spring Cloud Gateway. Since it is built on top of Spring WebFlux, that example is perfectly right for our current article.

1. Running Vault

We will run Vault inside the Docker container in development mode. The server running in that mode does not require any further setup, it is ready to use just after startup. After startup, our instance of Vault is available on port 8200. The version of Vault used in this article is 1.7.1.

$ docker run --cap-add=IPC_LOCK -d --name vault -p 8200:8200 vault

It is possible to login using different methods, but the most suitable way for us is through a token. To do that we have to display container logs using command docker logs vault, and then copy Root Token as shown below.

spring-webflux-vault-pki-config-engine

Finally, we are able to login to the Vault web console.

spring-webflux-vault-pki-config-role

2. Enable and configure Vault PKI

There are two ways to enable and configure Vault PKI: with CLI or via UI. Most articles describe a list of CLI commands required to configure the PKI engine. One of them is available on the Hashicorp site. However, I’m going to use Vault UI for that. So first, let’s enable a new engine on the main site.

spring-webflux-vault-pki-config-ca

Then, we need to choose a type of engine to enable. In our case it is the option PKI Certificates.

During creation let’s leave a default name pki. Then, we need to navigate into the newly enabled engine and create a new role. A role is used for generating certificates. The name of my role is default. This name is important because we would have to call it from the code using VaultTemplate.

spring-webflux-vault-pki-config-generate

The type of the key used in our role is rsa.

Before creating it, we should set some important parameters. One of them is TTL, which is set to 3 days. Also, don’t forget to check fields Allow any name and Require Common Name. Both of them are related to the CN field inside the certificate. Because we will store a username inside the CN field, we need to allow any name for it.

Once a role is created, we need to configure CA. To do that, we should first switch to the Configuration tab and then click Configure button.

After that, let’s choose Configure CA.

Finally, we can create a new CA certificate. We should leave the root value as CA Type and internal as Type. The default key format is pem. We can also set a Common Name for the CA certificate. For both role and CA it worth filling additional fields like e.g. the name of an organization or organization unit.

3. Integrating Spring WebFlux with Vault PKI

Let’s begin with dependencies. We need to include a Spring WebFlux starter for reactive API and a Spring Security starter to secure API. Integration with Vault API can be provided by spring-vault-core. I also had to include Jackson libraries in order to be able to start the application with Spring Vault.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-cloud-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-core</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.vault</groupId>
   <artifactId>spring-vault-core</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-security</artifactId>
</dependency>

Then, let’s configure a VaultTemplate bean. It should use the http scheme and an authentication token injected from the configuration.

@Value("vault.token")
private String vaultToken;

@Bean
VaultTemplate vaultTemplate() {
   VaultEndpoint e =  new VaultEndpoint();
   e.setScheme("http");
   VaultTemplate template = new VaultTemplate(e, new TokenAuthentication(vaultToken));
   return template;
}

The VaultTemplate provides dedicated support for interaction with the PKI engine. We just need to call the method opsForPki passing the PKI engine name to obtain the VaultPkiOperations instance (1). Then we need to build a certificate request with VaultCertificateRequest. We may set several parameters, but the most important is CN and certificate TTL (2). Finally, we should invoke the issueCertificate method passing the request and the name of the role configured on Vault PKI (3). Our certificate has been successfully generated. Now, we just need to obtain it from the response. The generated certificate, CA certificate, and a private key are available inside the CertificateBundle object, which is returned by the method.

private CertificateBundle issueCertificate() throws Exception {
   VaultPkiOperations pkiOperations = vaultTemplate.opsForPki("pki"); // (1)
   VaultCertificateRequest request = VaultCertificateRequest.builder()
        .ttl(Duration.ofHours(12))
        .commonName("localhost")
        .build(); // (2)
   VaultCertificateResponse response = pkiOperations.issueCertificate("default", request); // (3)
   CertificateBundle certificateBundle = response.getRequiredData(); // (4)

   log.info("Cert-SerialNumber: {}", certificateBundle.getSerialNumber());
   return certificateBundle;
}

4. Enable Spring WebFlux security

We have already integrated Spring WebFlux with Vault PKI in the previous section. Finally, we can proceed to the last step in our implementation – enable security based on X.509 certificates. To do that we need to create a @Configuration class. It should be annotated with @EnableWebFluxSecurity (1). We also need to obtain a username from the certificate by implementing the principal extractor. We are going to use SubjectDnX509PrincipalExtractor (2) with the right regex that reads data from the CN field. The final configuration disables CSRF, basic auth, and enables SSL with X509 certificates (3). We also need to provide an implementation of the UserDetails interface (4) with a single username piotrm.

@Configuration
@EnableWebFluxSecurity // (1)
public class SecurityConfig {

   @Bean
   public SecurityWebFilterChain filterChain(ServerHttpSecurity http) {
      SubjectDnX509PrincipalExtractor principalExtractor =
             new SubjectDnX509PrincipalExtractor(); // (2)
      principalExtractor.setSubjectDnRegex("CN=(.*?)(?:,|$)");

      return http.csrf().disable()
             .authorizeExchange(exchanges -> 
                    exchanges.anyExchange().authenticated())
             .x509()
                .principalExtractor(principalExtractor)
             .and()
                .httpBasic().disable().build(); // (3)
   }

   @Bean
   public MapReactiveUserDetailsService users() { // (4)
      UserDetails user1 = User.builder()
             .username("piotrm")
             .password("{noop}1234")
             .roles("USER")
             .build();
      return new MapReactiveUserDetailsService(user1);
   }
}

In the last step, we need to override the Netty server SSL configuration on runtime. Our customizer should implement the WebServerFactoryCustomizer interface, and use NettyReactiveWebServerFactory. Inside customize method we first invoke the method issueCertificate responsible for generating a certificate in Vault (you can refer to the previous section to see the implementation of that method) (1). The CertificateBundle contains all required data. We can invoke the method createKeyStore on it to create a keystore (2) and then save it in the file (3).

To override Netty SSL settings we should use the Ssl object. The client authentication needs to be enabled (4). We will also set the location of the currently created KeyStore (5). After that, we may proceed to the truststore creation. The issuer certificate may be obtained from CertificateBundle (6). Then we should create a new keystore, and set the CA certificate as an entry there (7). Finally, we will save the truststore to the file and set its location in the Ssl object.

@Component
@Slf4j
public class GatewayServerCustomizer implements 
         WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {

   @SneakyThrows
   @Override
   public void customize(NettyReactiveWebServerFactory factory) {
      String keyAlias = "vault";
      CertificateBundle bundle = issueCertificate(); // (1)
      KeyStore keyStore = bundle.createKeyStore(keyAlias); // (2)
      String keyStorePath = saveKeyStoreToFile("server-key.pkcs12", keyStore); // (3)

      Ssl ssl = new Ssl();
      ssl.setEnabled(true);
      ssl.setClientAuth(Ssl.ClientAuth.NEED); // (4)

      ssl.setKeyStore(keyStorePath); // (5)
      ssl.setKeyAlias(keyAlias);
      ssl.setKeyStoreType(keyStore.getType());
      ssl.setKeyPassword("");
      ssl.setKeyStorePassword("123456");

      X509Certificate caCert = bundle.getX509IssuerCertificate(); // (6)
      log.info("CA-SerialNumber: {}", caCert.getSerialNumber());
      KeyStore trustStore = KeyStore.getInstance("pkcs12");
      trustStore.load(null, null);
      trustStore.setCertificateEntry("ca", caCert); // (7)
      String trustStorePath = saveKeyStoreToFile("server-trust.pkcs12", trustStore); // (8)

      ssl.setTrustStore(trustStorePath); // (9)
      ssl.setTrustStorePassword("123456");
      ssl.setTrustStoreType(trustStore.getType());

      factory.setSsl(ssl);
      factory.setPort(8443);
   }
}

5. Testing Spring WebFlux with Vault PKI

Let’s run our sample application. It is available under the 8443 port. We will test it using the curl tool. Before doing it we need to generate a client certificate with a private key. Let’s go to the Vault UI once again. If you click a default Vault UI redirects to form responsible for certificate generation as shown below. In the Common Name field, we should provide the test username configured inside the UserDetails implementation. For me, it is piotrm. Also, don’t forget to set the right TTL.

After generating a certificate you will be redirected to the site with the results. First, you should copy the string with your certificate, and save it to the file. For me it is piotrm.crt. You can also display the content of a generated private key. Then, do the same as with the certificate. My filename is piotrm.key.

Finally, we can send a test request to our sample application passing the names of key and certificate files.

$ curl https://localhost:8443/hello -v --key piotrm.key --cert piotrm.crt

The post SSL with Spring WebFlux and Vault PKI appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/05/24/ssl-with-spring-webflux-and-vault-pki/feed/ 0 9745
A Deep Dive Into Spring WebFlux Threading Model https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/ https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/#comments Mon, 30 Mar 2020 11:35:58 +0000 http://piotrminkowski.com/?p=7878 If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks […]

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks is in their threading and concurrency model. Without understanding how reactive framework handles threads, you won’t fully understand reactivity. Let’s take a closer look on the threading model realized by Spring WebFlux and Project Reactor.
We should begin from the following fragment in Spring Framework Reference.

On a “vanilla” Spring WebFlux server (for example, no data access nor other optional dependencies), you can expect one thread for the server and several others for request processing (typically as many as the number of CPU cores).

So, the number of processing threads that handle incoming requests is related to the number of CPU cores. If you have 4 cores and you didn’t enable hyper-threading mechanism you would have 4 worker threads in the pool. You can determine the number of cores available to the JVM by using the static method, availableProcessors from class Runtime: Runtime.getRuntime().availableProcessors().

Example

At that moment, it is worth referring to the example application. It is a simple Spring Boot application that exposes reactive API built using Spring Webflux. The source code is available on GitHub: https://github.com/piomin/sample-spring-webflux.git. You can run it after building using java -jar command, or just simply run it from IDE. It also contains Dockerfile in the root directory, so you can build and run it inside a Docker container.

Limit CPU

With Docker you can easily limit the number of CPU cores “visible” for your application. When running a Docker container you just need to set parameter cpus as shown below.

$ docker run -d --name webflux --cpus="1" -p 8080:8080 piomin/sample-spring-webflux

The sample application is printing the number of available CPU cores during startup. Here’s the fragment of code responsible for it.

@SpringBootApplication
public class SampleSpringWebFluxApp {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxApp.class);

    public static void main(String[] args) {
        SpringApplication.run(SampleSpringWebFluxApp.class, args);
    }

    @PostConstruct
    public void init() {
        LOGGER.info("CPU: {}", Runtime.getRuntime().availableProcessors());
    }

}

Now, we can send some test requests to one of the available endpoints.

$ curl http://192.168.99.100:8080/persons/json

After that we may take a look at the logs generated by application using docker logs -f webflux command. The result is pretty unexpectable. We still have four worker threads in the pool, although the application uses only a single CPU core. Here’s the screen with application logs.

spring-webflux-threading-model-docker

An explanation is pretty simple. Here’s the fragment of Reactor Netty source code that illustrates the algorithm used. The minimum number of worker threads in the pool is 4.

spring-webflux-threading-model-reactor-netty

Well, it exactly explains why my application uses the same number of threads when it is running on my laptop with 4 CPUs, and on the Docker container which limits used CPU to a single core. You should also remember that Java is able to see the number of CPU cores inside a Docker container since version 10. If you use Java 8 on Docker your application sees the CPU on the machine not inside the container, which may have a huge impact on the consumed resources.

WebClient Thread Pool

Let’s consider the following implementation of reactive endpoint. First, we are emitting a stream of three Person objects, and then we are merging it with the stream returned by the WebClient.

@Autowired
WebClient client;

private Stream<Person> prepareStreamPart1() {
   return Stream.of(
      new Person(1, "Name01", "Surname01", 11),
      new Person(2, "Name02", "Surname02", 22),
      new Person(3, "Name03", "Surname03", 33)
   );
}

@GetMapping("/integration/{param}")
public Flux<Person> findPersonsIntegration(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
      );
}

In order to properly understand what exactly happens with thread pool let’s recall the following fragment from Spring Framework Reference.

The reactive WebClient operates in event loop style. So you can see a small, fixed number of processing threads related to that (for example, reactor-http-nio- with the Reactor Netty connector). However, if Reactor Netty is used for both client and server, the two share event loop resources by default.

Typically, you have 4 worker threads, which are shared between both server processing and client-side communication with other resources. Since, we have enabled detailed logging with log method, we may easily analyse step by step thread pooling for the single request sent to the test endpoint. The incoming request is being processed by reactor-http-nio-2 thread. Then WebClient is called and it subscribes to the result stream in the same thread. But the result is published on the different thread reactor-http-nio-3, a first available in the pool.

spring-webflux-threading-model-webclient-logs

We can also analyze the subsequent logs for a single selected thread. In the following fragment of logs for thread reactor-http-nio-1 you may see that WebClient does not block the thread while waiting for a result from a calling resource. The method onComplete is called here for times in row without any onSubscribe call. The idea of non-blocking threads is the main concept around reactive programming, which allows us to handle many requests using relatively small numbers of threads (4 in that case).


15:02:25.875 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.875 --- [ctor-http-nio-1] : request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.891 --- [ctor-http-nio-1] : | request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.892 --- [ctor-http-nio-1] : | onComplete()
15:02:25.894 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.895 --- [ctor-http-nio-1] : request(32)
15:02:25.924 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.925 --- [ctor-http-nio-1] : | request(32)
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.926 --- [ctor-http-nio-1] : | onComplete()
15:02:25.927 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.927 --- [ctor-http-nio-1] : request(32)
15:02:26.239 --- [ctor-http-nio-1] : onNext(Person(id=38483, firstName=Name6, lastName=Surname6, age=50))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=59103, firstName=Name6, lastName=Surname6, age=6))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=67587, firstName=Name6, lastName=Surname6, age=36))
15:02:26.242 --- [ctor-http-nio-1] : onComplete()
15:02:26.283 --- [ctor-http-nio-1] : onNext(Person(id=40006, firstName=Name2, lastName=Surname2, age=41))
15:02:26.284 --- [ctor-http-nio-1] : onNext(Person(id=94727, firstName=Name2, lastName=Surname2, age=79))
15:02:26.287 --- [ctor-http-nio-1] : onNext(Person(id=19891, firstName=Name2, lastName=Surname2, age=84))
15:02:26.287 --- [ctor-http-nio-1] : onComplete()
15:02:26.292 --- [ctor-http-nio-1] : onNext(Person(id=67550, firstName=Name14, lastName=Surname14, age=92))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=85063, firstName=Name14, lastName=Surname14, age=76))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=54572, firstName=Name14, lastName=Surname14, age=18))
15:02:26.293 --- [ctor-http-nio-1] : onComplete()
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=49896, firstName=Name16, lastName=Surname16, age=20))
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=85684, firstName=Name16, lastName=Surname16, age=39))
15:02:26.296 --- [ctor-http-nio-1] : onNext(Person(id=2605, firstName=Name16, lastName=Surname16, age=75))
15:02:26.296 --- [ctor-http-nio-1] : onComplete()
15:02:26.337 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:26.338 --- [ctor-http-nio-1] : | request(32)
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:26.340 --- [ctor-http-nio-1] : | onComplete()

Long response time

If you are using WebClient for communication with other resources you should be able to handle a long response time. Of course WebClient does not block the thread, but sometimes it is desired to use another thread pool than the main worker thread pool shared with the server. To do that you should use publishOn method. Spring WebFlux provides thread pool abstractions called Schedulers. You may use it to create different concurrency strategies. If you prefer to have a full control of the minimum and maximum number of threads in the pool you should define your own task executor as shown below. The minimum size of the pool is 5, while the maximum size is 10.

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
   executor.setCorePoolSize(5);
   executor.setMaxPoolSize(10);
   executor.setQueueCapacity(100);
   executor.setThreadNamePrefix("slow-");
   executor.initialize();
   return executor;
}

Then you just need to set it as a default scheduler for publishOn method. Here’s our second test endpoint that sets a dedicated thread pool for WebClient.

@GetMapping("/integration-in-different-pool/{param}")
public Flux<Person> findPersonsIntegrationInDifferentPool(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
            .publishOn(Schedulers.fromExecutor(taskExecutor))
         );
}

Performance Testing

Now, our main goal is to compare performance of the both implemented endpoints. First of them using the same thread pool for server requests processing and for WebClient, while the second creates separate thread pool for WebClient with publishOn. The first step of preparing such a comparison test is to create a mock with delayed response. We will use MockWebServer provided by okhttp. It caches requests send to /slow/{param} endpoint, emits the stream with Person objects delayed 200 ms.

public static MockWebServer mockBackEnd;

@BeforeClass
public static void setUp() throws IOException {
   final Dispatcher dispatcher = new Dispatcher() {

      @NotNull
      @Override
      public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException {
         String pathParam = recordedRequest.getPath().replaceAll("/slow/", "");
         List personsPart2 = List.of(new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)));
         try {
            return new MockResponse()
               .setResponseCode(200)
               .setBody(mapper.writeValueAsString(personsPart2))
               .setHeader("Content-Type", "application/json")
               .setBodyDelay(200, TimeUnit.MILLISECONDS);
         }
         catch (JsonProcessingException e) {
            e.printStackTrace();
         }
         return null;
      }
   };
   mockBackEnd = new MockWebServer();
   mockBackEnd.setDispatcher(dispatcher);
   mockBackEnd.start();
   System.setProperty("target.uri", "http://localhost:" + mockBackEnd.getPort());
}

@AfterClass
public static void tearDown() throws IOException {
   mockBackEnd.shutdown();
}

The last step in our implementation is to create test methods. For calling both sample endpoints we can use synchronous TestRestTemplate, since it doesn’t impact on the results. For benchmarking I’m using junit-benchmarks library, which allows me to define the number of concurrent threads and maximum number of attempts. To enable that library for JUnit test we just need to define @Rule. Here’s the implementation of our performance test. We are running Spring Boot in test mode and then starting 50 concurrent threads with 300 attempts.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@RunWith(SpringRunner.class)
public class PerformanceSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceSpringWebFluxTest.class);
    private static ObjectMapper mapper = new ObjectMapper();
    private static Random r = new Random();
    private static int i = 0;

    @Rule
    public TestRule benchmarkRun = new BenchmarkRule();
    @Autowired
    TestRestTemplate template;
   
    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformance() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }

    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformanceInDifferentPool() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration-in-different-pool/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }
}

Here are the results. For an endpoint with a default Spring WebFlux threading model assuming a shareable thread pool between server processing and client requests there is ~5.5s for processing all 300 requests. For a method with a separate thread pool for WebClient we have ~2.9s for all 300 requests. In general, the more threads you will use in your thread the ratio of average processing time between first and second model would be changed in favor of the second model (with separate thread pool for client).

spring-webflux-threading-model-testresult1

And the result for separated WebClient thread pool.

spring-webflux-threading-model-testresult2

Using Spring Boot Actuator

Sometimes, you need to use some additional libraries in your WebFlux application. One of them is Spring Boot Actuator, which may be especially required if you have to expose health check or metrics endpoints. To find out how Spring WebFlux is handling threading for Spring Boot Actuator we will profile our application with YourKit. After running the application you should generate some traffic to /actuator/health endpoint. Here’s the screen with threads from YourKit. As you see a worker thread (reactor-http-nio-*) delegates long-running job threads available in bounded elastic pool (boundedElastic-*).

spring-webflux-threading-model-yourkit

Threads from bounded elastic are checking free disk space, that is by default a part of Spring Boot Actuator /health endpoint.

spring-webflux-threading-model-diskspace

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/feed/ 19 7878
Using Reactive WebClient with Spring WebFlux https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/ https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/#comments Mon, 04 Nov 2019 10:12:52 +0000 https://piotrminkowski.wordpress.com/?p=7407 Reactive APIs and generally reactive programming become increasingly popular lately. You have a chance to observe more and more new frameworks and toolkits supporting reactive programming, or just dedicated for this. Today, in the era of microservices architecture, network communication through APIs becomes critical for applications. Therefore, reactive APIs seem to be an attractive alternative […]

The post Using Reactive WebClient with Spring WebFlux appeared first on Piotr's TechBlog.

]]>
Reactive APIs and generally reactive programming become increasingly popular lately. You have a chance to observe more and more new frameworks and toolkits supporting reactive programming, or just dedicated for this. Today, in the era of microservices architecture, network communication through APIs becomes critical for applications. Therefore, reactive APIs seem to be an attractive alternative to a traditional, synchronous approach. It should be definitely considered as a primary approach if you are working with large streams of data exchanged via network communication.

Spring supports reactive programming and reactive APIs too. You could have a chance to read about it in some of my previous articles. I focused on introducing that support. In the article Reactive Microservices with Spring WebFlux and Spring Cloud you can read more about building microservices architecture using Spring WebFlux and Spring Cloud. In turn, in the articles Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux and Reactive Elasticsearch with Spring Boot I have introduced reactive Spring Data repositories on an example of PostgreSQL and Elasticsearch. Those articles should be treated as an introduction to reactive programming with Spring. Today, I would like to go deeply into that topic and discuss some aspects related to the network communication between service that exposes reactive stream via API and service that consumes this API using Spring WebClient.

1. Access reactive stream using Spring WebClient

First, let’s consider the typical scenario of reading reactive API on the consumer side. We have the following implementation of reactive stream containing Person objects on the producer side:

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping("/json")
    public Flux<Person> findPersonsJson() {
        return Flux.fromStream(this::prepareStream)
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

We can easily access it with non-blocking Spring WebFlux WebClient. The following test starts with a sample Spring WebFlux application, defines WebClient instance, and subscribes to the response stream.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsJson() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/json").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

In reactive programming with Reactor and Spring WebFlux you first need to subscribe to the stream in order to be able to access emitted objects. Assuming that our test stream has 9 Person elements you will receive the following log output:

spring-webflux-webclient-1

Let’s think about what happened here. Our reactive stream on the server side has been returned as a JSON array response to the consumer. It means that our reactive client is able to start reading the stream only after completion of the elements emission on the server side. That’s not exactly what we wanted to achieve, because we would like to take advantage of reactive streams also on the client side, and be able to read every element on stream just after it has been emitted by the producer. But with application/json content type it is just not possible, what is perfectly seen on the fragment of response below.

spring-webflux-webclient-3

2. Expose event stream using Spring WebFlux

The problem defined in the previous section lies obviously on the server side. Spring WebFlux WebClient is able to read reactive streams continuously, but the producer needs to “properly” emit it. To achieve it we should set content type to application/json+stream or text/event-stream.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStream() {
        return Flux.fromStream(this::prepareStream).delaySequence(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

Now, our response looks slightly different than before as shown below.

spring-webflux-webclient-4

To see the effect during the test I delayed the whole stream a little (around 100 milliseconds), because it takes a time before subscribes start to receive elements from the stream after subscribing to it. The current test is the same as the previous test, we are subscribing to the response stream and printing all the elements.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStream() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream").retrieve().bodyToFlux(Person.class);
        persons.subscribe(person -> {
            LOGGER.info("Client subscribes: {}", person);
            waiter.assertNotNull(person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }
}

Here are the results. As you see the elements are processed on the client-side just after being emitted by the producer. What is worth to note – all the elements are sending within the same thread.

webclient-2

3. Implementing backpressure

Backpressure is one of the most important reasons you would decide to use reactive programming. Following Spring WebFlux documentation it supports backpressure, since Project Reactor is a Reactive Streams library and, therefore, all of its operators support non-blocking back pressure. The whole sentence is of course conforming to the truth, but only on the server-side. Maybe the next fragment of documentation shall shed some light on things: Reactor has a strong focus on server-side Java.. We should remember that Spring WebClient and WebFlux uses TCP transport for communication between a client and the server. And therefore, a client is not able to regulate the frequency of elements emission on the server-side. I don’t want to go into the details right now, for the explanation of that situation you may refer to the following post https://stackoverflow.com/questions/52244808/backpressure-mechanism-in-spring-web-flux.
Ok, before proceeding let’s recap the definition of backpressure term. Backpressure (or backpressure) is resistance or force opposing the desired flow of data through software. In simple words, if a producer sends more events than a consumer is able to handle in a specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Let’s consider the following test example.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class SampleSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxTest.class);
    final WebClient client = WebClient.builder().baseUrl("http://localhost:8080").build();

    @Test
    public void testFindPersonsStreamBackPressure() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flux<Person> persons = client.get().uri("/persons/stream/back-pressure").retrieve().bodyToFlux(Person.class);
        persons.map(this::doSomeSlowWork).subscribe(person -> {
            waiter.assertNotNull(person);
            LOGGER.info("Client subscribes: {}", person);
            waiter.resume();
        });
        waiter.await(3000, 9);
    }

    private Person doSomeSlowWork(Person person) {
        try {
            Thread.sleep(90);
        }
        catch (InterruptedException e) { }
        return person;
    }
}

After receiving a stream of elements our test calls a time-expensive mapping method on each element. So, it is not able to handle so many elements as has been sent by the producer. In this case, the only way to somehow “regulate” backpressure is through the delayElements method on the server-side. I also tried to use the limitRate method on the service side and implement my own custom Subscriber on the client-side, but I wasn’t successful. Here’s the current implementation of our API method for returning streams of Person objects.

@RestController
@RequestMapping("/persons")
public class PersonController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonController.class);

    @GetMapping(value = "/stream/back-pressure", produces = MediaType.APPLICATION_STREAM_JSON_VALUE)
    public Flux<Person> findPersonsStreamBackPressure() {
        return Flux.fromStream(this::prepareStream).delayElements(Duration.ofMillis(100))
                .doOnNext(person -> LOGGER.info("Server produces: {}", person));
    }
}

After running the test we can see that every element is 100 milliseconds delayed, and also the producer uses the whole pool of threads for emitting a stream of objects.

webclient-3

Source Code

The source code of sample application and JUnit tests is as always available on GitHub. The repository address is https://github.com/piomin/sample-spring-webflux.git.

The post Using Reactive WebClient with Spring WebFlux appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/11/04/using-reactive-webclient-with-spring-webflux/feed/ 2 7407
Performance Comparison Between Spring MVC vs Spring WebFlux with Elasticsearch https://piotrminkowski.com/2019/10/30/performance-comparison-between-spring-mvc-and-spring-webflux-with-elasticsearch/ https://piotrminkowski.com/2019/10/30/performance-comparison-between-spring-mvc-and-spring-webflux-with-elasticsearch/#respond Wed, 30 Oct 2019 14:50:38 +0000 https://piotrminkowski.wordpress.com/?p=7383 Since Spring 5 and Spring Boot 2, there is full support for reactive REST API with the Spring WebFlux project. Also, project Spring Data systematically includes support for reactive NoSQL databases, and recently for SQL databases too. Since Spring Data Moore we can take advantage of reactive template and repository for Elasticsearch, what I have […]

The post Performance Comparison Between Spring MVC vs Spring WebFlux with Elasticsearch appeared first on Piotr's TechBlog.

]]>
Since Spring 5 and Spring Boot 2, there is full support for reactive REST API with the Spring WebFlux project. Also, project Spring Data systematically includes support for reactive NoSQL databases, and recently for SQL databases too. Since Spring Data Moore we can take advantage of reactive template and repository for Elasticsearch, what I have already described in one of my previous article Reactive Elasticsearch With Spring Boot.
Recently, we can observe the rising popularity of reactive programming and reactive APIs. This fact has led me to perform some comparison between synchronous API built on top of Spring MVC vs Spring WebFlux reactive API. The comparison will cover server-side memory usage and an average response time on the client-side. We will also use Spring Data Elasticsearch Repositories accessed by the controller for integration with a running instance of Elasticsearch on a Docker container. To make the test objective we will of course use the same versions of Spring Boot and Spring Data projects. First, let’s consider some prerequisites.

1. Dependencies

We are using Spring Boot in version 2.2.0.RELEASE with JDK 11.

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.2.0.RELEASE</version>
   <relativePath/>
</parent>
<properties>
   <java.version>11</java.version>
</properties>

Here’s the list of dependencies for the application with synchronous REST API:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

And here’s for the application reactive API:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

2. Running Elasticsearch

We will run the same Docker container for both tests. The container is started in development mode as a single node.

$ docker run -d --name elasticsearch -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:6.6.2

We will insert the initial set of data into Elasticsearch.

public class SampleDataSet {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleDataSet.class);
    private static final String INDEX_NAME = "sample";
    private static final String INDEX_TYPE = "employee";
    private static int COUNTER = 0;

    @Autowired
    ElasticsearchTemplate template;
    @Autowired
    TaskExecutor taskExecutor;

    @PostConstruct
    public void init() {
        if (!template.indexExists(INDEX_NAME)) {
            template.createIndex(INDEX_NAME);
            LOGGER.info("New index created: {}", INDEX_NAME);
        }
        for (int i = 0; i < 10000; i++) {
            taskExecutor.execute(() -> bulk());
        }
    }

    public void bulk() {
        try {
            ObjectMapper mapper = new ObjectMapper();
            List<IndexQuery> queries = new ArrayList<>();
            List<Employee> employees = employees();
            for (Employee employee : employees) {
                IndexQuery indexQuery = new IndexQuery();
                indexQuery.setSource(mapper.writeValueAsString(employee));
                indexQuery.setIndexName(INDEX_NAME);
                indexQuery.setType(INDEX_TYPE);
                queries.add(indexQuery);
            }
            if (queries.size() > 0) {
                template.bulkIndex(queries);
            }
            template.refresh(INDEX_NAME);
            LOGGER.info("BulkIndex completed: {}", ++COUNTER);
        } catch (Exception e) {
            LOGGER.error("Error bulk index", e);
        }
    }

    private List<Employee> employees() {
        List<Employee> employees = new ArrayList<>();
        for (int i = 0; i < 10000; i++) {
            Random r = new Random();
            Employee employee = new Employee();
            employee.setName("JohnSmith" + r.nextInt(1000000));
            employee.setAge(r.nextInt(100));
            employee.setPosition("Developer");
            int departmentId = r.nextInt(500000);
            employee.setDepartment(new Department((long) departmentId, "TestD" + departmentId));
            int organizationId = departmentId / 100;
            employee.setOrganization(new Organization((long) organizationId, "TestO" + organizationId, "Test Street No. " + organizationId));
            employees.add(employee);
        }
        return employees;
    }

}

We are testing a single document Employee:

@Document(indexName = "sample", type = "employee")
public class Employee {

    @Id
    private String id;
    @Field(type = FieldType.Object)
    private Organization organization;
    @Field(type = FieldType.Object)
    private Department department;
    private String name;
    private int age;
    private String position;
   
}

I think that a set of data shouldn’t be too large, but also not too small. Let’s test node with around 18M of documents divided into 5 shards.

elastic-perf-1

3. Synchronous API Tests

The library used for performance tests is junit-benchmarks. It allows to define the number of concurrent threads for JUnit test method, and the number of repeats.

<dependency>
   <groupId>com.carrotsearch</groupId>
   <artifactId>junit-benchmarks</artifactId>
   <version>0.7.2</version>
   <scope>test</scope>
</dependency>

The implementation of JUnit test class is visible below. It should extends AbstractBenchmark class and define the test rule BenchmarkRule. The tests are performed on the running external application available under localhost:8080 using TestRestTemplate. We have three test scenarios. In the first implementation inside addTest we are verifying a time required for adding a new document to Elasticsearch through POST method. Another two scenarios defined in methods findByNameTest and findByOrganizationNameTest tests search methods. Each test is running in 30 concurrent threads and repeated 500 times.

public class EmployeeRepositoryPerformanceTest extends AbstractBenchmark {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeRepositoryPerformanceTest.class);

    @Rule
    public TestRule benchmarkRun = new BenchmarkRule();

    private TestRestTemplate template = new TestRestTemplate();
    private Random r = new Random();

    @Test
    @BenchmarkOptions(concurrency = 30, benchmarkRounds = 500, warmupRounds = 2)
    public void addTest() {
        Employee employee = new Employee();
        employee.setName("John Smith");
        employee.setAge(r.nextInt(100));
        employee.setPosition("Developer");
        employee.setDepartment(new Department((long) r.nextInt(1000), "TestD"));
        employee.setOrganization(new Organization((long) r.nextInt(100), "TestO", "Test Street No. 1"));
        employee = template.postForObject("http://localhost:8080/employees", employee, Employee.class);
        Assert.assertNotNull(employee);
        Assert.assertNotNull(employee.getId());
    }

    @Test
    @BenchmarkOptions(concurrency = 30, benchmarkRounds = 500, warmupRounds = 2)
    public void findByNameTest() {
        String name = "JohnSmith" + r.nextInt(1000000);
        Employee[] employees = template.getForObject("http://localhost:8080/employees/{name}", Employee[].class, name);
        LOGGER.info("Found: {}", employees.length);
        Assert.assertNotNull(employees);
    }

    @Test
    @BenchmarkOptions(concurrency = 30, benchmarkRounds = 500, warmupRounds = 2)
    public void findByOrganizationNameTest() {
        String organizationName = "TestO" + r.nextInt(5000);
        Employee[] employees = template.getForObject("http://localhost:8080/employees/organization/{organizationName}", Employee[].class, organizationName);
        LOGGER.info("Found: {}", employees.length);
        Assert.assertNotNull(employees);
    }

}

4. Reactive API Tests

For reactive API we have the same scenarios, but they have to be implemented a little differently since we have asynchronous, non-blocking API. First, we will use a smart library called concurrentunit for testing multi-threaded or asynchronous code.

<dependency>
   <groupId>net.jodah</groupId>
   <artifactId>concurrentunit</artifactId>
   <version>0.4.6</version>
   <scope>test</scope>
</dependency>

ConcurrentUnit library allows us to define the Waiter object which is responsible for performing assertions and waiting for operations in any thread, and then notifying back the main test thread. Also we are using WebClient, which is able to retrieve reactive streams defined as Flux and Mono.

public class EmployeeRepositoryPerformanceTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeRepositoryPerformanceTest.class);

    @Rule
    public TestRule benchmarkRun = new BenchmarkRule();

    private final Random r = new Random();
    private final WebClient client = WebClient.builder()
            .baseUrl("http://localhost:8080")
            .defaultHeader(HttpHeaders.CONTENT_TYPE, "application/json")
            .build();

    @Test
    @BenchmarkOptions(concurrency = 30, benchmarkRounds = 500, warmupRounds = 2)
    public void addTest() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Employee employee = new Employee();
        employee.setName("John Smith");
        employee.setAge(r.nextInt(100));
        employee.setPosition("Developer");
        employee.setDepartment(new Department((long) r.nextInt(10), "TestD"));
        employee.setOrganization(new Organization((long) r.nextInt(10), "TestO", "Test Street No. 1"));
        Mono<Employee> empMono = client.post().uri("/employees").body(Mono.just(employee), Employee.class).retrieve().bodyToMono(Employee.class);
        empMono.subscribe(employeeLocal -> {
            waiter.assertNotNull(employeeLocal);
            waiter.assertNotNull(employeeLocal.getId());
            waiter.resume();
        });
        waiter.await(5000);
    }

    @Test
    @BenchmarkOptions(concurrency = 30, benchmarkRounds = 500, warmupRounds = 2)
    public void findByNameTest() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        String name = "JohnSmith" + r.nextInt(1000000);
        Flux<Employee> employees = client.get().uri("/employees/{name}", name).retrieve().bodyToFlux(Employee.class);
        employees.count().subscribe(count -> {
            waiter.assertTrue(count > 0);
            waiter.resume();
            LOGGER.info("Found({}): {}", name, count);
        });
        waiter.await(5000);
    }

    @Test
    @BenchmarkOptions(concurrency = 30, benchmarkRounds = 500, warmupRounds = 2)
    public void findByOrganizationNameTest() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        String organizationName = "TestO" + r.nextInt(5000);
        Flux<Employee> employees = client.get().uri("/employees/organization/{organizationName}", organizationName).retrieve().bodyToFlux(Employee.class);
        employees.count().subscribe(count -> {
            waiter.assertTrue(count > 0);
            waiter.resume();
            LOGGER.info("Found: {}", count);
        });
        waiter.await(5000);
    }

}

5. Spring MVC vs Spring WebFlux – Test Results

After discussing some prerequisites and implementation details we may finally proceed to the tests. I think that the results are pretty interesting. Let’s begin with Spring MVC tests. Here are graphs that illustrate memory usage during the tests. The first of them shows heap memory usage.

spring-mvc-vs-webflux-elastic-perf-2

The second shows metaspace.

spring-mvc-vs-webflux-elastic-perf-3

Here are equivalent graphs for reactive API tests. The heap memory usage is a little higher than for previous tests, although generally, Netty requires lower memory than Tomcat (50MB instead of 100MB before running the test).

elastic-perf-6

The metaspace usage is a little lower than for synchronous API tests (60MB vs 75MB).

spring-mvc-vs-webflux-elastic-perf-7

And now the processing time test results. They may be a little unexpected. In fact, there is no big difference between synchronous and reactive tests. One thing that should be explained here. The method findByName returns a lower set of employees than findByOrganizationName. That’s why it is much faster than the method for searching by organization name.

spring-mvc-vs-webflux-elastic-perf-4

As I mentioned before the results are pretty the same especially if thinking about the POST method. The result for findByName is 6.2s instead of 7.1s for synchronous calls, which gives a difference of around 15%. The test for findByOrganizationName has failed due to exceeding the 5s timeout defined for every single run of the test method. It seems that processing results around 3-4k of objects in a single response has significantly slowed down the sample application based on Spring WebFlux and reactive Elasticsearch repositories.

elastic-perf-5

Summary

I won’t discuss the result of these tests. The thoughts are on your side. The source code repository is available on GitHub https://github.com/piomin/sample-spring-elasticsearch. Branch master contains a version for Spring MVC tests, while branch reactive for Spring WebFlux tests.

The post Performance Comparison Between Spring MVC vs Spring WebFlux with Elasticsearch appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/10/30/performance-comparison-between-spring-mvc-and-spring-webflux-with-elasticsearch/feed/ 0 7383
Reactive Logging With Spring WebFlux and Logstash https://piotrminkowski.com/2019/10/15/reactive-logging-with-spring-webflux-and-logstash/ https://piotrminkowski.com/2019/10/15/reactive-logging-with-spring-webflux-and-logstash/#comments Tue, 15 Oct 2019 11:12:57 +0000 https://piotrminkowski.wordpress.com/?p=7346 I have already introduced my Spring Boot library for synchronous HTTP request/response logging in one of my previous articles Logging with Spring Boot and Elastic Stack. This library is dedicated to synchronous REST applications built with Spring MVC and Spring Web. Since version 5.0 Spring Framework also offers support for reactive REST API through the […]

The post Reactive Logging With Spring WebFlux and Logstash appeared first on Piotr's TechBlog.

]]>
I have already introduced my Spring Boot library for synchronous HTTP request/response logging in one of my previous articles Logging with Spring Boot and Elastic Stack. This library is dedicated to synchronous REST applications built with Spring MVC and Spring Web. Since version 5.0 Spring Framework also offers support for reactive REST API through the Spring WebFlux project. I decided to extend support for logging in my library to reactive Spring WebFlux.
The repository with source code is available on GitHub: https://github.com/piomin/spring-boot-logging.git. It consists with two Maven modules: logstash-logging-spring-boot-starter for synchronous logging and reactive-logstash-logging-spring-boot-starter for reactive Spring WebFlux applications. The library is available on Maven Central:

<dependency>
  <groupId>com.github.piomin</groupId>
  <artifactId>reactive-logstash-logging-spring-boot-starter</artifactId>
  <version>1.0.0.RELEASE</version>
</dependency>

Motivations

Although we are working with reactive APIs and streams, a requirement for logging every incoming request and outgoing response is still actual. Today, we are usually running many, small applications communicating with each other, so we are focusing on storing the logs in a single, central place. Here comes Logstash and Elastic Stack. Spring Boot and Spring WebFlux allow you to build reactive microservices fast. My library takes care of gathering HTTP request/response logs, sending them to ELK with proper tags and correlation. Using it in your application does not require any additional source code. You just need to include the library.
However, some things need to be discussed when talking about reactive logging. Because we are logging full requests with payloads we need to buffer them. It somehow goes against the reactive programming, since we’re trying there to be efficient with the available resources. Also, integration with Logstash is realized synchronously. It is worth keeping those two things in mind when using reactive-logstash-logging-spring-boot-starter in your application.

Implementation Details

Spring WebFlux Dependencies

Since the library is used for Spring Boot reactive APIs logging it needs to have Spring WebFlux in the dependencies. In turn, Spring WebFlux is built on top of Project Reactor, so reactor-core artifact also has to be on the dependencies list. We also need some standard Spring libraries, used for example to provide auto-configuration.

<dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-context</artifactId>
   <version>${spring.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.springframework</groupId>
   <artifactId>spring-webflux</artifactId>
   <version>${spring.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-autoconfigure</artifactId>
   <version>${spring.boot.version}</version>
   <scope>provided</scope>
</dependency>
<dependency>
   <groupId>io.projectreactor</groupId>
   <artifactId>reactor-core</artifactId>
   <version>3.3.0.RELEASE</version>
   <scope>provided</scope>
</dependency>

Spring WebFlux Reactive Interceptor

With Spring WebFlux we don’t have popular Spring MVC components for caching request/response bodies: ContentCachingRequestWrapper and ContentCachingResponseWrapper. However, an approach will be pretty similar to the approach applied when building a library for synchronous logging. We need to access the request and response body by wrapping it and buffering without consuming the stream. To do that we first need to create classes extending ServerHttpRequestDecorator and ServerHttpResponseDecorator. They give us access to the message body while Spring WebFlux is reading the stream and writing to the stream.
When extending ServerHttpRequestDecorator we need to override getBody. Keep in mind that we cannot block a reactive stream, so one of doOn is suitable for accessing it. The body is published as Flux containing DataBuffer objects. Inside the asynchronous doOnNext method we write the buffer to the temporary byte array.

public class RequestLoggingInterceptor extends ServerHttpRequestDecorator {

   private static final Logger LOGGER = LoggerFactory.getLogger(RequestLoggingInterceptor.class);

   public RequestLoggingInterceptor(ServerHttpRequest delegate) {
      super(delegate);
   }

   @Override
   public Flux<DataBuffer> getBody() {
      ByteArrayOutputStream baos = new ByteArrayOutputStream();
      return super.getBody().doOnNext(dataBuffer -> {
         try {
            Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
            String body = IOUtils.toString(baos.toByteArray(), "UTF-8");
            LOGGER.info("Request: method={}, uri={}, payload={}, audit={}", getDelegate().getMethod(),
                  getDelegate().getPath(), body, value("audit", true));
         } catch (IOException e) {
            e.printStackTrace();
         } finally {
            try {
               baos.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      });
   }

}

When extending ServerHttpResponseDecorator we need to override writeWith method responsible for writing body to output reactive stream. We will listen for body writing events in doOnNext. Then we access DataBuffer and buffer it in ByteArrayOutputStream.

public class ResponseLoggingInterceptor extends ServerHttpResponseDecorator {

   private static final Logger LOGGER = LoggerFactory.getLogger(ResponseLoggingInterceptor.class);

   private long startTime;
   private boolean logHeaders;

   public ResponseLoggingInterceptor(ServerHttpResponse delegate, long startTime, boolean logHeaders) {
      super(delegate);
      this.startTime = startTime;
      this.logHeaders = logHeaders;
   }

   @Override
   public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
      Flux<DataBuffer> buffer = Flux.from(body);
      return super.writeWith(buffer.doOnNext(dataBuffer -> {
         ByteArrayOutputStream baos = new ByteArrayOutputStream();
         try {
            Channels.newChannel(baos).write(dataBuffer.asByteBuffer().asReadOnlyBuffer());
            String bodyRes = IOUtils.toString(baos.toByteArray(), "UTF-8");
            if (logHeaders)
               LOGGER.info("Response({} ms): status={}, payload={}, audit={}", value("X-Response-Time", System.currentTimeMillis() - startTime),
                     value("X-Response-Status", getStatusCode().value()), bodyRes, value("audit", true));
            else
               LOGGER.info("Response({} ms): status={}, payload={}, audit={}", value("X-Response-Time", System.currentTimeMillis() - startTime),
                     value("X-Response-Status", getStatusCode().value()), bodyRes, value("audit", true));
         } catch (IOException e) {
            e.printStackTrace();
         } finally {
            try {
               baos.close();
            } catch (IOException e) {
               e.printStackTrace();
            }
         }
      }));
   }
}

Spring WebFlux Logging Filter

To be able to decorate requests and responses we first need to declare filter intercepting an incoming request. To do that we have to declare a bean that implements WebFilter and its method filter(...). The filtering method allows you to access the exchange object, which contains objects representing request and response. So if we would like to decorate request and response objects we first need to decorate ServerWebExchange. We may easily do it by defining an instance of  the ServerWebExchangeDecorator object with overridden methods getRequest and getResponse. Our decorators are responsible just for listening to events related to message body processing. So, the significant information here is that if a message has an empty body, the listening methods won’t be fired. That’s why I decided to add a code for analyzing the length of content to log a request or response message with an empty body. It is based on the HTTP header Content-Length.

public class ReactiveSpringLoggingFilter implements WebFilter {

   private static final Logger LOGGER = LoggerFactory.getLogger(ReactiveSpringLoggingFilter.class);
   private UniqueIDGenerator generator;
   private String ignorePatterns;
   private boolean logHeaders;
   private boolean useContentLength;

   public ReactiveSpringLoggingFilter(UniqueIDGenerator generator, String ignorePatterns, boolean logHeaders, boolean useContentLength) {
      this.generator = generator;
      this.ignorePatterns = ignorePatterns;
      this.logHeaders = logHeaders;
      this.useContentLength = useContentLength;
   }

   @Override
   public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
      if (ignorePatterns != null && exchange.getRequest().getURI().getPath().matches(ignorePatterns)) {
         return chain.filter(exchange);
      } else {
         generator.generateAndSetMDC(exchange.getRequest());
         final long startTime = System.currentTimeMillis();
         List<String> header = exchange.getRequest().getHeaders().get("Content-Length");
         if (useContentLength && (header == null || header.get(0).equals("0"))) {
            if (logHeaders)
               LOGGER.info("Request: method={}, uri={}, headers={}, audit={}", exchange.getRequest().getMethod(),
                     exchange.getRequest().getURI().getPath(), exchange.getRequest().getHeaders(), value("audit", true));
            else
               LOGGER.info("Request: method={}, uri={}, audit={}", exchange.getRequest().getMethod(),
                     exchange.getRequest().getURI().getPath(), value("audit", true));
         }
         ServerWebExchangeDecorator exchangeDecorator = new ServerWebExchangeDecorator(exchange) {
            @Override
            public ServerHttpRequest getRequest() {
               return new RequestLoggingInterceptor(super.getRequest(), logHeaders);
            }

            @Override
            public ServerHttpResponse getResponse() {
               return new ResponseLoggingInterceptor(super.getResponse(), startTime, logHeaders);
            }
         };
         return chain.filter(exchangeDecorator)
            .doOnSuccess(aVoid -> {
               logResponse(startTime, exchangeDecorator.getResponse(), exchangeDecorator.getResponse().getStatusCode().value());
            })
            .doOnError(throwable -> {
               logResponse(startTime, exchangeDecorator.getResponse(), 500);
            });
      }
   }
}

The last step of implementation is auto-configuration. Here’s the class responsible for it.

@Configuration
@ConfigurationProperties(prefix = "logging.logstash")
public class ReactiveSpringLoggingAutoConfiguration {

   private static final String LOGSTASH_APPENDER_NAME = "LOGSTASH";

   private String url = "localhost:8500";
   private String ignorePatterns;
   private boolean logHeaders;
   private boolean useContentLength;
   private String trustStoreLocation;
   private String trustStorePassword;
   @Value("${spring.application.name:-}")
   String name;

   @Bean
   public UniqueIDGenerator generator() {
      return new UniqueIDGenerator();
   }

   @Bean
   public ReactiveSpringLoggingFilter reactiveSpringLoggingFilter() {
      return new ReactiveSpringLoggingFilter(generator(), ignorePatterns, logHeaders, useContentLength);
   }

   @Bean
   @ConditionalOnProperty("logging.logstash.enabled")
   public LogstashTcpSocketAppender logstashAppender() {
      ...
   }
}

Usage of Spring WebFlux Logging

To be able to create reactive APIs with Spring Boot we first need to include Spring WebFlux starter to Maven dependencies.

<parent>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-parent</artifactId>
   <version>2.1.9.RELEASE</version>
   <relativePath/>
</parent>
<groupId>pl.piomin.test</groupId>
<artifactId>sample-webflux-app</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
   <java.version>11</java.version>
</properties>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>com.github.piomin</groupId>
   <artifactId>reactive-logstash-logging-spring-boot-starter</artifactId>
   <version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>

I have already described how to build microservices architecture with Spring WebFlux and Spring Cloud in one of my previous articles Reactive Microservices with Spring WebFlux and Spring Cloud. So for more information about advanced use cases, you can refer to this article. Here’s a typical controller implementation with Spring WebFlux Mono and Flux objects.

@RestController
public class AccountController {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountController.class);
   
   @Autowired
   private AccountRepository repository;

   @GetMapping("/customer/{customer}")
   public Flux<Account> findByCustomer(@PathVariable("customer") String customerId) {
      LOGGER.info("findByCustomer: customerId={}", customerId);
      return repository.findByCustomerId(customerId);
   }

   @GetMapping
   public Flux<Account> findAll() {
      LOGGER.info("findAll");
      return repository.findAll();
   }

   @GetMapping("/{id}")
   public Mono<Account> findById(@PathVariable("id") String id) {
      LOGGER.info("findById: id={}", id);
      return repository.findById(id);
   }

   @PostMapping
   public Mono<Account> create(@RequestBody Account account) {
      LOGGER.info("create: {}", account);
      return repository.save(account);
   }
   
}

Here are the log entries for GET (empty body) and POST requests.

reactive-logging

We can customize the library behaviour by overriding default values of configuration properties with logging.logstash.*. Here’s the typical configuration that enables embedded Logstash appender configuration, changes default Logstash URL, includes list of headers to the log and ignores logging of /favicon.ico requests.


logging.logstash:
  enabled: true
  url: 192.168.99.100:8500
  ignorePatterns: .*(favicon).*
  logHeaders: true

With the settings visible above the logs are sent to Logstash available on address 192.168.99.100:8500.

logstash-1

Summary

Spring Boot Logging library now supports logging for synchronous HTTP API with Spring MVC and reactive HTTP API with Spring WebFlux. The detailed description of the libraries configuration features may be found in my article Using logstash-logging-spring-boot-starter for logging with Spring Boot and Logstash. You can report the bugs or propose new enhancements here: https://github.com/piomin/spring-boot-logging/issues. Any feedback would be very welcome.

The post Reactive Logging With Spring WebFlux and Logstash appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/10/15/reactive-logging-with-spring-webflux-and-logstash/feed/ 12 7346
Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/ https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/#comments Thu, 18 Oct 2018 07:42:10 +0000 https://piotrminkowski.wordpress.com/?p=6863 There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive […]

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive Microservices with Spring WebFlux and Spring Cloud. Spring 5 has also introduced some projects supporting reactive access to NoSQL databases like Cassandra, MongoDB or Couchbase. But there were still a lack in support for reactive to access to relational databases. The change is coming together with R2DBC (Reactive Relational Database Connectivity) project. That project is also being developed by Pivotal members. It seems to be very interesting initiative, however it is rather at the beginning of the road. Anyway, there is a module for integration with Postgres, and we will use it for our demo application. R2DBC will not be the only one new interesting solution described in this article. I also show you how to use Spring Data JDBC – another really interesting project released recently.
It is worth mentioning some words about Spring Data JDBC. This project has been already released, and is available under version 1.0. It is a part of a bigger Spring Data framework. It offers a repository abstraction based on JDBC. The main reason for creating that library is to allow access to relational databases using Spring Data way (through CrudRepository interfaces) without including JPA library to the application dependencies. Of course, JPA is still certainly the main persistence API used for Java applications. Spring Data JDBC aims to be much simpler conceptually than JPA by not implementing popular patterns like lazy loading, caching, dirty context, sessions. It also provides only very limited support for annotation-based mapping. Finally, it provides an implementation of reactive repositories that uses R2DBC for accessing a relational database. Although that module is still under development (only SNAPSHOT version is available), we will try to use it in our demo application. Let’s proceed to the implementation.

Including dependencies

We use Kotlin for implementation. So first, we include some required Kotlin dependencies.

<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-stdlib</artifactId>
   <version>${kotlin.version}</version>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.module</groupId>
   <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-test-junit</artifactId>
   <version>${kotlin.version}</version>
   <scope>test</scope>
</dependency>

We should also add kotlin-maven-plugin with support for Spring.


<plugin>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-maven-plugin</artifactId>
   <version>${kotlin.version}</version>
   <executions>
      <execution>
         <id>compile</id>
         <phase>compile</phase>
         <goals>
            <goal>compile</goal>
         </goals>
      </execution>
      <execution>
         <id>test-compile</id>
         <phase>test-compile</phase>
         <goals>
            <goal>test-compile</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <args>
         <arg>-Xjsr305=strict</arg>
      </args>
      <compilerPlugins>
         <plugin>spring</plugin>
      </compilerPlugins>
   </configuration>
</plugin>

Then, we may proceed to including frameworks required for the demo implementation. We need to include the special SNAPSHOT version of Spring Data JDBC dedicated for accessing databases using R2DBC. We also have to add some R2DBC libraries and Spring WebFlux. As you may see below only Spring WebFlux is available in stable version (as a part of Spring Boot RELEASE).

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.data</groupId>
   <artifactId>spring-data-jdbc</artifactId>
   <version>1.0.0.r2dbc-SNAPSHOT</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-spi</artifactId>
   <version>1.0.0.M5</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-postgresql</artifactId>
   <version>1.0.0.M5</version>
</dependency>

It is also important to set dependency management for Spring Data project.


<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.data</groupId>
         <artifactId>spring-data-releasetrain</artifactId>
         <version>Lovelace-RELEASE</version>
         <scope>import</scope>
         <type>pom</type>
      </dependency>
   </dependencies>
</dependencyManagement>

Repositories

We are using the well known Spring Data style of CRUD repository implementation. In that case we need to create an interface that extends ReactiveCrudRepository interface.
Here’s the implementation of a repository for managing Employee objects.

interface EmployeeRepository : ReactiveCrudRepository<Employee, Int> {
    @Query("select id, name, salary, organization_id from employee e where e.organization_id = $1")
    fun findByOrganizationId(organizationId: Int) : Flux<Employee>
}

Here’s another implementation of repository – this time for managing Organization objects.

interface OrganizationRepository : ReactiveCrudRepository<Organization, Int> 

Implementing Entities and DTOs

Kotlin provides a convenient way of creating entity class by declaring them as data class. When using Spring Data JDBC we have to set the primary key for the entity by annotating the field with @Id. It assumes the key is automatically incremented by the database. If you are not using auto-increment columns, you have to use a BeforeSaveEvent listener, which sets the ID of the entity. However, I tried to set such a listener for my entity, but it just didn’t work with the reactive version of Spring Data JDBC.
Here’s an implementation of Employee entity class. What is worth mentioning Spring Data JDBC will automatically map class field organizationId into database column organization_id.

data class Employee(val name: String, val salary: Int, val organizationId: Int) {
    @Id 
    var id: Int? = null
}

Here’s an implementation of Organization entity class.

data class Organization(var name: String) {
    @Id 
    var id: Int? = null
}

R2DBC does not support any lists or sets. Because I’d like to return a list with employees inside Organization object in one of API endpoints I have created a DTO containing such a list as shown below.

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList = ArrayList()
    constructor(employees: MutableList) : this(null, "") {
        this.employees = employees
    }
}

The SQL scripts corresponding to the created entities are visible below. Field type serial will automatically create a sequence and attach it to the field id.

CREATE TABLE employee (
    name character varying NOT NULL,
    salary integer NOT NULL,
    id serial PRIMARY KEY,
    organization_id integer
);
CREATE TABLE organization (
    name character varying NOT NULL,
    id serial PRIMARY KEY
);

Building sample web applications

For the demo purposes we will build two independent applications employee-service and organization-service. Application organization-service is communicating with employee-service using WebFlux WebClient. It gets the list of employees assigned to the organization, and includes them to response together with Organization object. Sample applications source code is available on GitHub under repository sample-spring-data-webflux: https://github.com/piomin/sample-spring-data-webflux.
Ok, let’s begin from declaring Spring Boot main class. We need to enable Spring Data JDBC repositories by annotating the main class with @EnableJdbcRepositories.

@SpringBootApplication
@EnableJdbcRepositories
class EmployeeApplication

fun main(args: Array<String>) {
    runApplication<EmployeeApplication>(*args)
}

Working with R2DBC and Postgres requires some configuration. Probably due to an early stage of progress in development of Spring Data JDBC and R2DBC there is no Spring Boot auto-configuration for Postgres. We need to declare connection factory, client, and repository inside @Configuration bean.

@Configuration
class EmployeeConfiguration {

    @Bean
    fun repository(factory: R2dbcRepositoryFactory): EmployeeRepository {
        return factory.getRepository(EmployeeRepository::class.java)
    }

    @Bean
    fun factory(client: DatabaseClient): R2dbcRepositoryFactory {
        val context = RelationalMappingContext()
        context.afterPropertiesSet()
        return R2dbcRepositoryFactory(client, context)
    }

    @Bean
    fun databaseClient(factory: ConnectionFactory): DatabaseClient {
        return DatabaseClient.builder().connectionFactory(factory).build()
    }

    @Bean
    fun connectionFactory(): PostgresqlConnectionFactory {
        val config = PostgresqlConnectionConfiguration.builder() //
                .host("192.168.99.100") //
                .port(5432) //
                .database("reactive") //
                .username("reactive") //
                .password("reactive123") //
                .build()

        return PostgresqlConnectionFactory(config)
    }

}

Finally, we can create REST controllers that contain the definition of our reactive API methods. With Kotlin it does not take much space. The following controller definition contains three GET methods that allows to find all employees, all employees assigned to a given organization or a single employee by id.

@RestController
@RequestMapping("/employees")
class EmployeeController {

    @Autowired
    lateinit var repository : EmployeeRepository

    @GetMapping
    fun findAll() : Flux<Employee> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Employee> = repository.findById(id)

    @GetMapping("/organization/{organizationId}")
    fun findByorganizationId(@PathVariable organizationId : Int) : Flux<Employee> = repository.findByOrganizationId(organizationId)

    @PostMapping
    fun add(@RequestBody employee: Employee) : Mono<Employee> = repository.save(employee)

}

Inter-service Communication

For the OrganizationController the implementation is a little bit more complicated. Because organization-service is communicating with employee-service, we first need to declare reactive WebFlux WebClient builder.

@Bean
fun clientBuilder() : WebClient.Builder {
   return WebClient.builder()
}

Then, similar to the repository bean the builder is being injected into the controller. It is used inside findByIdWithEmployees method for calling method GET /employees/organization/{organizationId} exposed by employee-service. As you can see on the code fragment below it provides a reactive API and returns Flux object containing a list of found employees. This list is injected into OrganizationDTO object using zipWith Reactor method.

@RestController
@RequestMapping("/organizations")
class OrganizationController {

    @Autowired
    lateinit var repository : OrganizationRepository
    @Autowired
    lateinit var clientBuilder : WebClient.Builder

    @GetMapping
    fun findAll() : Flux<Organization> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Organization> = repository.findById(id)

    @GetMapping("/{id}/withEmployees")
    fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
        val employees : Flux<Employee> = clientBuilder.build().get().uri("http://localhost:8090/employees/organization/$id")
                .retrieve().bodyToFlux(Employee::class.java)
        val org : Mono = repository.findById(id)
        return org.zipWith(employees.collectList())
                .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

    @PostMapping
    fun add(@RequestBody employee: Organization) : Mono<Organization> = repository.save(employee)

}

How it works?

Before running the tests we need to start a Postgres database. Here’s the Docker command used for running a Postgres container. It is creating a user with a password, and setting up a default database.

$ docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=reactive -e POSTGRES_PASSWORD=reactive123 -e POSTGRES_DB=reactive postgres

Then we need to create some test tables, so you have to run a SQL script placed in the section Implementing Entities and DTOs. After that you can start our test applications. If you do not override default settings provided inside application.yml files employee-service is listening on port 8090, and organization-service on port 8095. The following picture illustrates the architecture of our sample system.
spring-data-1
Now, let’s add some test data using the reactive API exposed by the applications.

$ curl -d '{"name":"Test1"}' -H "Content-Type: application/json" -X POST http://localhost:8095/organizations
$ curl -d '{"name":"Name1", "balance":5000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees
$ curl -d '{"name":"Name2", "balance":10000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees

Finally you can call GET organizations/{id}/withEmployees method, for example using your web browser. The result should be similar to the result visible on the following picture.

spring-data-2

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/feed/ 3 6863
Reactive Microservices with Spring WebFlux and Spring Cloud https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/ https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/#comments Fri, 04 May 2018 09:32:45 +0000 https://piotrminkowski.wordpress.com/?p=6475 I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try […]

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try to put our reactive microservices inside Spring Cloud ecosystem, which contains such the elements like service discovery with Eureka, load balancing with Spring Cloud Commons @LoadBalanced, and API gateway using Spring Cloud Gateway (also based on WebFlux and Netty). We will also check out Spring reactive support for NoSQL databases by the example of Spring Data Reactive Mongo project.

Here’s the figure that illustrates an architecture of our sample system consisting of two microservices, discovery server, gateway and MongoDB databases. The source code is as usual available on GitHub in sample-spring-cloud-webflux repository.

reactive-1

Let’s describe the further steps on the way to create the system illustrated above.

Step 1. Building reactive application using Spring WebFlux

To enable library Spring WebFlux for the project we should include starter spring-boot-starter-webflux to the dependencies. It includes some dependent libraries like Reactor or Netty server.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

REST controller looks pretty similar to the controller defined for synchronous web services. The only difference is in type of returned objects. Instead of a single object we return an instance of class Mono, and instead of list we return instance of class Flux. Thanks to Spring Data Reactive Mongo we don’t have to do anything more that call the needed method on the repository bean.


@RestController
public class AccountController {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountController.class);

   @Autowired
   private AccountRepository repository;

   @GetMapping("/customer/{customer}")
   public Flux findByCustomer(@PathVariable("customer") String customerId) {
      LOGGER.info("findByCustomer: customerId={}", customerId);
      return repository.findByCustomerId(customerId);
   }

   @GetMapping
   public Flux findAll() {
      LOGGER.info("findAll");
      return repository.findAll();
   }

   @GetMapping("/{id}")
   public Mono findById(@PathVariable("id") String id) {
   LOGGER.info("findById: id={}", id);
      return repository.findById(id);
   }

   @PostMapping
   public Mono create(@RequestBody Account account) {
      LOGGER.info("create: {}", account);
      return repository.save(account);
   }

}

Step 2. Integrate an application with database using Spring Data Reactive Mongo

The implementation of integration between application and database is also very simple. First, we need to include starter spring-boot-starter-data-mongodb-reactive to the project dependencies.

<dependency>
  <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

The support for reactive Mongo repositories is automatically enabled after including the starter. The next step is to declare entity with ORM mappings. The following class is also returned as reponse by AccountController.

@Document
public class Account {

   @Id
   private String id;
   private String number;
   private String customerId;
  private int amount;

   ...

}

Finally, we may create a repository interface that extends ReactiveCrudRepository. It follows the patterns implemented by Spring Data JPA and provides some basic methods for CRUD operations. It also allows you to define methods with names, which are automatically mapped to queries. The only difference in comparison with standard Spring Data JPA repositories is in method signatures. The objects are wrapped by Mono and Flux.

public interface AccountRepository extends ReactiveCrudRepository {

   Flux findByCustomerId(String customerId);

}

In this example I used Docker container for running MongoDB locally. Because I run Docker on Windows using Docker Toolkit the default address of Docker machine is 192.168.99.100. Here’s the configuration of data source in application.yml file.

spring:
  data:
    mongodb:
      uri: mongodb://192.168.99.100/test

Step 3. Enabling service discovery using Eureka

Integration with Spring Cloud Eureka is pretty the same as for synchronous REST microservices. To enable discovery client we should first include starter spring-cloud-starter-netflix-eureka-client to the project dependencies.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

Then we have to enable it using @EnableDiscoveryClient annotation.

@SpringBootApplication
@EnableDiscoveryClient
public class AccountApplication {

   public static void main(String[] args) {
      SpringApplication.run(AccountApplication.class, args);
   }

}

Microservice will automatically register itself in Eureka. Of course, we may run more than one instance of every service. Here’s the screen illustrating Eureka Dashboard (http://localhost:8761) after running two instances of account-service and a single instance of customer-service.  I would not like to go into the details of running application with embedded Eureka server. You may refer to my previous article for details: Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. Eureka server is available as discovery-service module.

spring-reactive

Step 4. Inter-service communication between reactive microservices with WebClient

An inter-service communication is realized by the WebClient from Spring WebFlux project. The same as for RestTemplate you should annotate it with Spring Cloud Commons @LoadBalanced . It enables integration with service discovery and load balancing using Netflix OSS Ribbon client. So, the first step is to declare a client builder bean with @LoadBalanced annotation.

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
   return WebClient.builder();
}

Then we may inject WebClientBuilder into the REST controller. Communication with account-service is implemented inside GET /{id}/with-accounts , where first we are searching for customer entity using reactive Spring Data repository. It returns object Mono , while the WebClient returns Flux . Now, our main goal is to merge those to publishers and return single Mono object with the list of accounts taken from Flux without blocking the stream. The following fragment of code illustrates how I used WebClient to communicate with other microservice, and then merge the response and result from repository to single Mono object. This merge may probably be done in more “elegant” way, so feel free to create push request with your proposal.

@Autowired
private WebClient.Builder webClientBuilder;

@GetMapping("/{id}/with-accounts")
public Mono findByIdWithAccounts(@PathVariable("id") String id) {
   LOGGER.info("findByIdWithAccounts: id={}", id);
   Flux accounts = webClientBuilder.build().get().uri("http://account-service/customer/{customer}", id).retrieve().bodyToFlux(Account.class);
   return accounts
      .collectList()
      .map(a -> new Customer(a))
      .mergeWith(repository.findById(id))
      .collectList()
      .map(CustomerMapper::map);
}

Step 5. Building API gateway using Spring Cloud Gateway

Spring Cloud Gateway is one of the newest Spring Cloud projects. It is built on top of Spring WebFlux, and thanks to that we may use it as a gateway to our sample system based on reactive microservices with Spring Boot. Similar to Spring WebFlux applications it is run on an embedded Netty server. To enable it for the Spring Boot application just include the following dependency to your project.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

We should also enable a discovery client in order to allow the gateway to fetch a list of registered microservices. However, there is no need to register a gateway application in Eureka. To disable registration you may set property eureka.client.registerWithEureka to false inside application.yml file.

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayApplication {

   public static void main(String[] args) {
      SpringApplication.run(GatewayApplication.class, args);
   }

}

By default, Spring Cloud Gateway does not enable integration with service discovery. To enable it we should set property spring.cloud.gateway.discovery.locator.enabled to true. Now, the last thing that should be done is the configuration of the routes. Spring Cloud Gateway provides two types of components that may be configured inside routes: filters and predicates. Predicates are used for matching HTTP requests with the route, while filters can be used to modify requests and responses before or after sending the downstream request. Here’s the full configuration of gateway. It enables service discovery location, and defines two routes based on entries in service registry. We use the Path Route Predicate factory for matching the incoming requests, and the RewritePath GatewayFilter factory for modifying the requested path to adapt it to the format exposed by the downstream services (endpoints are exposed under path /, while gateway expose them under paths /account and /customer).

spring:
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
      - id: account-service
        uri: lb://account-service
        predicates:
        - Path=/account/**
        filters:
        - RewritePath=/account/(?.*), /$\{path}
      - id: customer-service
        uri: lb://customer-service
        predicates:
        - Path=/customer/**
        filters:
        - RewritePath=/customer/(?.*), /$\{path}

Step 6. Testing reactive microservices with Spring Boot

Before making some tests let’s just recap our sample system. We have two microservices account-service, customer-service that use MongoDB as a database. Microservice customer-service calls endpoint GET /customer/{customer} exposed by account-service. The URL of account-service is taken from Eureka. The whole sample system is hidden behind the gateway, which is available under address localhost:8090.
Now, the first step is to run MongoDB on a Docker container. After executing the following command Mongo is available under address 192.168.99.100:27017.

$ docker run -d --name mongo -p 27017:27017 mongo

Then we may proceed to running discovery-service. Eureka is available under its default address localhost:8761. You may run it using your IDE or just by executing command java -jar target/discovery-service-1.0-SNAPHOT.jar. The same rule applies to our sample microservices. However, account-service needs to be multiplied in two instances, so you need to override default HTTP port when running second instance using -Dserver.port VM argument, for example java -jar -Dserver.port=2223 target/account-service-1.0-SNAPSHOT.jar. Finally, after running gateway-service we may add some test data.

$ curl --header "Content-Type: application/json" --request POST --data '{"firstName": "John","lastName": "Scott","age": 30}' http://localhost:8090/customer
{"id": "5aec1debfa656c0b38b952b4","firstName": "John","lastName": "Scott","age": 30,"accounts": null}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567890","amount": 5000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e86fa656c11d4c655fb","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 5000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567891","amount": 12000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e91fa656c11d4c655fc","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 12000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567892","amount": 2000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e99fa656c11d4c655fd","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 2000}

To test inter-service communication just call endpoint GET /customer/{id}/with-accounts on gateway-service. It forwards the request to customer-service, and then customer-service calls endpoint exposed by account-service using reactive WebClient. The result is visible below.

reactive-2

Conclusion

Since Spring 5 and Spring Boot 2.0 there is a full range of available ways to build microservices-based architecture. We can build standard synchronous system using one-to-one communication with Spring Cloud Netflix project, messaging microservices based on message broker and publish/subscribe communication model with Spring Cloud Stream, and finally asynchronous, reactive microservices with Spring WebFlux. The main goal of this article is to show you how to use Spring WebFlux together with Spring Cloud projects in order to provide such mechanisms like service discovery, load balancing or API gateway for reactive microservices built on top of Spring Boot. Before Spring 5 the lack of support for reactive microservices Spring Boot support was one of the drawbacks of Spring framework, but now with Spring WebFlux it is no longer the case. Not only that, we may leverage Spring reactive support for the most popular NoSQL databases like MongoDB or Cassandra, and easily place our reactive microservices inside one system together with synchronous REST microservices.

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/feed/ 4 6475