Netty Archives - Piotr's TechBlog https://piotrminkowski.com/tag/netty/ Java, Spring, Kotlin, microservices, Kubernetes, containers Mon, 24 May 2021 06:41:04 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 Netty Archives - Piotr's TechBlog https://piotrminkowski.com/tag/netty/ 32 32 181738725 SSL with Spring WebFlux and Vault PKI https://piotrminkowski.com/2021/05/24/ssl-with-spring-webflux-and-vault-pki/ https://piotrminkowski.com/2021/05/24/ssl-with-spring-webflux-and-vault-pki/#respond Mon, 24 May 2021 06:40:59 +0000 https://piotrminkowski.com/?p=9745 In this article, you will learn how to configure the Vault PKI engine and integrate it with Spring WebFlux. With Vault PKI you can easily generate X.509 certificates signed by the CA. Then your application may get a certificate through a REST API. Its TTL is relatively short. It is unique per each application instance. […]

The post SSL with Spring WebFlux and Vault PKI appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to configure the Vault PKI engine and integrate it with Spring WebFlux. With Vault PKI you can easily generate X.509 certificates signed by the CA. Then your application may get a certificate through a REST API. Its TTL is relatively short. It is unique per each application instance. Also, we can use Spring VaultTemplate to simplify integration with Vault API.

Let’s say a little bit more about Vault. It allows you to secure, store, and control access to tokens, passwords, certificates, and encryption keys using UI, CLI, or HTTP API. It is a really powerful tool. With Vault, instead of a traditional approach, you can manage your security in a more dynamic, cloud-native way. For example, you can integrate Vault with a database backend, and then generate user login and password on the fly. Moreover, for Spring Boot applications you can take an advantage of the Spring Cloud Vault project. If you are interested in more information about it read my article Testing Spring Boot Integration with Vault and Postgres using Testcontainers.

That’s not all. You can integrate Vault with other tools from Hashicorp like Consul or Nomad. In other words, it allows us to build a cloud-native platform in a secure way. For more details please refer to the article Secure Spring Cloud Microservices with Vault and Nomad.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my repository sample-spring-cloud-security. Then you should go to the gateway-service directory, and just follow my instructions in the next sections. The sample application acts as an API gateway for microservices. We use Spring Cloud Gateway. Since it is built on top of Spring WebFlux, that example is perfectly right for our current article.

1. Running Vault

We will run Vault inside the Docker container in development mode. The server running in that mode does not require any further setup, it is ready to use just after startup. After startup, our instance of Vault is available on port 8200. The version of Vault used in this article is 1.7.1.

$ docker run --cap-add=IPC_LOCK -d --name vault -p 8200:8200 vault

It is possible to login using different methods, but the most suitable way for us is through a token. To do that we have to display container logs using command docker logs vault, and then copy Root Token as shown below.

spring-webflux-vault-pki-config-engine

Finally, we are able to login to the Vault web console.

spring-webflux-vault-pki-config-role

2. Enable and configure Vault PKI

There are two ways to enable and configure Vault PKI: with CLI or via UI. Most articles describe a list of CLI commands required to configure the PKI engine. One of them is available on the Hashicorp site. However, I’m going to use Vault UI for that. So first, let’s enable a new engine on the main site.

spring-webflux-vault-pki-config-ca

Then, we need to choose a type of engine to enable. In our case it is the option PKI Certificates.

During creation let’s leave a default name pki. Then, we need to navigate into the newly enabled engine and create a new role. A role is used for generating certificates. The name of my role is default. This name is important because we would have to call it from the code using VaultTemplate.

spring-webflux-vault-pki-config-generate

The type of the key used in our role is rsa.

Before creating it, we should set some important parameters. One of them is TTL, which is set to 3 days. Also, don’t forget to check fields Allow any name and Require Common Name. Both of them are related to the CN field inside the certificate. Because we will store a username inside the CN field, we need to allow any name for it.

Once a role is created, we need to configure CA. To do that, we should first switch to the Configuration tab and then click Configure button.

After that, let’s choose Configure CA.

Finally, we can create a new CA certificate. We should leave the root value as CA Type and internal as Type. The default key format is pem. We can also set a Common Name for the CA certificate. For both role and CA it worth filling additional fields like e.g. the name of an organization or organization unit.

3. Integrating Spring WebFlux with Vault PKI

Let’s begin with dependencies. We need to include a Spring WebFlux starter for reactive API and a Spring Security starter to secure API. Integration with Vault API can be provided by spring-vault-core. I also had to include Jackson libraries in order to be able to start the application with Spring Vault.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-cloud-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-core</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-databind</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.core</groupId>
   <artifactId>jackson-annotations</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.vault</groupId>
   <artifactId>spring-vault-core</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-security</artifactId>
</dependency>

Then, let’s configure a VaultTemplate bean. It should use the http scheme and an authentication token injected from the configuration.

@Value("vault.token")
private String vaultToken;

@Bean
VaultTemplate vaultTemplate() {
   VaultEndpoint e =  new VaultEndpoint();
   e.setScheme("http");
   VaultTemplate template = new VaultTemplate(e, new TokenAuthentication(vaultToken));
   return template;
}

The VaultTemplate provides dedicated support for interaction with the PKI engine. We just need to call the method opsForPki passing the PKI engine name to obtain the VaultPkiOperations instance (1). Then we need to build a certificate request with VaultCertificateRequest. We may set several parameters, but the most important is CN and certificate TTL (2). Finally, we should invoke the issueCertificate method passing the request and the name of the role configured on Vault PKI (3). Our certificate has been successfully generated. Now, we just need to obtain it from the response. The generated certificate, CA certificate, and a private key are available inside the CertificateBundle object, which is returned by the method.

private CertificateBundle issueCertificate() throws Exception {
   VaultPkiOperations pkiOperations = vaultTemplate.opsForPki("pki"); // (1)
   VaultCertificateRequest request = VaultCertificateRequest.builder()
        .ttl(Duration.ofHours(12))
        .commonName("localhost")
        .build(); // (2)
   VaultCertificateResponse response = pkiOperations.issueCertificate("default", request); // (3)
   CertificateBundle certificateBundle = response.getRequiredData(); // (4)

   log.info("Cert-SerialNumber: {}", certificateBundle.getSerialNumber());
   return certificateBundle;
}

4. Enable Spring WebFlux security

We have already integrated Spring WebFlux with Vault PKI in the previous section. Finally, we can proceed to the last step in our implementation – enable security based on X.509 certificates. To do that we need to create a @Configuration class. It should be annotated with @EnableWebFluxSecurity (1). We also need to obtain a username from the certificate by implementing the principal extractor. We are going to use SubjectDnX509PrincipalExtractor (2) with the right regex that reads data from the CN field. The final configuration disables CSRF, basic auth, and enables SSL with X509 certificates (3). We also need to provide an implementation of the UserDetails interface (4) with a single username piotrm.

@Configuration
@EnableWebFluxSecurity // (1)
public class SecurityConfig {

   @Bean
   public SecurityWebFilterChain filterChain(ServerHttpSecurity http) {
      SubjectDnX509PrincipalExtractor principalExtractor =
             new SubjectDnX509PrincipalExtractor(); // (2)
      principalExtractor.setSubjectDnRegex("CN=(.*?)(?:,|$)");

      return http.csrf().disable()
             .authorizeExchange(exchanges -> 
                    exchanges.anyExchange().authenticated())
             .x509()
                .principalExtractor(principalExtractor)
             .and()
                .httpBasic().disable().build(); // (3)
   }

   @Bean
   public MapReactiveUserDetailsService users() { // (4)
      UserDetails user1 = User.builder()
             .username("piotrm")
             .password("{noop}1234")
             .roles("USER")
             .build();
      return new MapReactiveUserDetailsService(user1);
   }
}

In the last step, we need to override the Netty server SSL configuration on runtime. Our customizer should implement the WebServerFactoryCustomizer interface, and use NettyReactiveWebServerFactory. Inside customize method we first invoke the method issueCertificate responsible for generating a certificate in Vault (you can refer to the previous section to see the implementation of that method) (1). The CertificateBundle contains all required data. We can invoke the method createKeyStore on it to create a keystore (2) and then save it in the file (3).

To override Netty SSL settings we should use the Ssl object. The client authentication needs to be enabled (4). We will also set the location of the currently created KeyStore (5). After that, we may proceed to the truststore creation. The issuer certificate may be obtained from CertificateBundle (6). Then we should create a new keystore, and set the CA certificate as an entry there (7). Finally, we will save the truststore to the file and set its location in the Ssl object.

@Component
@Slf4j
public class GatewayServerCustomizer implements 
         WebServerFactoryCustomizer<NettyReactiveWebServerFactory> {

   @SneakyThrows
   @Override
   public void customize(NettyReactiveWebServerFactory factory) {
      String keyAlias = "vault";
      CertificateBundle bundle = issueCertificate(); // (1)
      KeyStore keyStore = bundle.createKeyStore(keyAlias); // (2)
      String keyStorePath = saveKeyStoreToFile("server-key.pkcs12", keyStore); // (3)

      Ssl ssl = new Ssl();
      ssl.setEnabled(true);
      ssl.setClientAuth(Ssl.ClientAuth.NEED); // (4)

      ssl.setKeyStore(keyStorePath); // (5)
      ssl.setKeyAlias(keyAlias);
      ssl.setKeyStoreType(keyStore.getType());
      ssl.setKeyPassword("");
      ssl.setKeyStorePassword("123456");

      X509Certificate caCert = bundle.getX509IssuerCertificate(); // (6)
      log.info("CA-SerialNumber: {}", caCert.getSerialNumber());
      KeyStore trustStore = KeyStore.getInstance("pkcs12");
      trustStore.load(null, null);
      trustStore.setCertificateEntry("ca", caCert); // (7)
      String trustStorePath = saveKeyStoreToFile("server-trust.pkcs12", trustStore); // (8)

      ssl.setTrustStore(trustStorePath); // (9)
      ssl.setTrustStorePassword("123456");
      ssl.setTrustStoreType(trustStore.getType());

      factory.setSsl(ssl);
      factory.setPort(8443);
   }
}

5. Testing Spring WebFlux with Vault PKI

Let’s run our sample application. It is available under the 8443 port. We will test it using the curl tool. Before doing it we need to generate a client certificate with a private key. Let’s go to the Vault UI once again. If you click a default Vault UI redirects to form responsible for certificate generation as shown below. In the Common Name field, we should provide the test username configured inside the UserDetails implementation. For me, it is piotrm. Also, don’t forget to set the right TTL.

After generating a certificate you will be redirected to the site with the results. First, you should copy the string with your certificate, and save it to the file. For me it is piotrm.crt. You can also display the content of a generated private key. Then, do the same as with the certificate. My filename is piotrm.key.

Finally, we can send a test request to our sample application passing the names of key and certificate files.

$ curl https://localhost:8443/hello -v --key piotrm.key --cert piotrm.crt

The post SSL with Spring WebFlux and Vault PKI appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/05/24/ssl-with-spring-webflux-and-vault-pki/feed/ 0 9745
A Deep Dive Into Spring WebFlux Threading Model https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/ https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/#comments Mon, 30 Mar 2020 11:35:58 +0000 http://piotrminkowski.com/?p=7878 If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks […]

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
If you are building reactive applications with Spring WebFlux, typically you will use Reactor Netty as a default embedded server. Reactor Netty is currently one of the most popular asynchronous event-driven frameworks. It provides non-blocking and backpressure-ready TCP, HTTP, and UDP clients and servers. In fact, the most important difference between synchronous and reactive frameworks is in their threading and concurrency model. Without understanding how reactive framework handles threads, you won’t fully understand reactivity. Let’s take a closer look on the threading model realized by Spring WebFlux and Project Reactor.
We should begin from the following fragment in Spring Framework Reference.

On a “vanilla” Spring WebFlux server (for example, no data access nor other optional dependencies), you can expect one thread for the server and several others for request processing (typically as many as the number of CPU cores).

So, the number of processing threads that handle incoming requests is related to the number of CPU cores. If you have 4 cores and you didn’t enable hyper-threading mechanism you would have 4 worker threads in the pool. You can determine the number of cores available to the JVM by using the static method, availableProcessors from class Runtime: Runtime.getRuntime().availableProcessors().

Example

At that moment, it is worth referring to the example application. It is a simple Spring Boot application that exposes reactive API built using Spring Webflux. The source code is available on GitHub: https://github.com/piomin/sample-spring-webflux.git. You can run it after building using java -jar command, or just simply run it from IDE. It also contains Dockerfile in the root directory, so you can build and run it inside a Docker container.

Limit CPU

With Docker you can easily limit the number of CPU cores “visible” for your application. When running a Docker container you just need to set parameter cpus as shown below.

$ docker run -d --name webflux --cpus="1" -p 8080:8080 piomin/sample-spring-webflux

The sample application is printing the number of available CPU cores during startup. Here’s the fragment of code responsible for it.

@SpringBootApplication
public class SampleSpringWebFluxApp {

    private static final Logger LOGGER = LoggerFactory.getLogger(SampleSpringWebFluxApp.class);

    public static void main(String[] args) {
        SpringApplication.run(SampleSpringWebFluxApp.class, args);
    }

    @PostConstruct
    public void init() {
        LOGGER.info("CPU: {}", Runtime.getRuntime().availableProcessors());
    }

}

Now, we can send some test requests to one of the available endpoints.

$ curl http://192.168.99.100:8080/persons/json

After that we may take a look at the logs generated by application using docker logs -f webflux command. The result is pretty unexpectable. We still have four worker threads in the pool, although the application uses only a single CPU core. Here’s the screen with application logs.

spring-webflux-threading-model-docker

An explanation is pretty simple. Here’s the fragment of Reactor Netty source code that illustrates the algorithm used. The minimum number of worker threads in the pool is 4.

spring-webflux-threading-model-reactor-netty

Well, it exactly explains why my application uses the same number of threads when it is running on my laptop with 4 CPUs, and on the Docker container which limits used CPU to a single core. You should also remember that Java is able to see the number of CPU cores inside a Docker container since version 10. If you use Java 8 on Docker your application sees the CPU on the machine not inside the container, which may have a huge impact on the consumed resources.

WebClient Thread Pool

Let’s consider the following implementation of reactive endpoint. First, we are emitting a stream of three Person objects, and then we are merging it with the stream returned by the WebClient.

@Autowired
WebClient client;

private Stream<Person> prepareStreamPart1() {
   return Stream.of(
      new Person(1, "Name01", "Surname01", 11),
      new Person(2, "Name02", "Surname02", 22),
      new Person(3, "Name03", "Surname03", 33)
   );
}

@GetMapping("/integration/{param}")
public Flux<Person> findPersonsIntegration(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
      );
}

In order to properly understand what exactly happens with thread pool let’s recall the following fragment from Spring Framework Reference.

The reactive WebClient operates in event loop style. So you can see a small, fixed number of processing threads related to that (for example, reactor-http-nio- with the Reactor Netty connector). However, if Reactor Netty is used for both client and server, the two share event loop resources by default.

Typically, you have 4 worker threads, which are shared between both server processing and client-side communication with other resources. Since, we have enabled detailed logging with log method, we may easily analyse step by step thread pooling for the single request sent to the test endpoint. The incoming request is being processed by reactor-http-nio-2 thread. Then WebClient is called and it subscribes to the result stream in the same thread. But the result is published on the different thread reactor-http-nio-3, a first available in the pool.

spring-webflux-threading-model-webclient-logs

We can also analyze the subsequent logs for a single selected thread. In the following fragment of logs for thread reactor-http-nio-1 you may see that WebClient does not block the thread while waiting for a result from a calling resource. The method onComplete is called here for times in row without any onSubscribe call. The idea of non-blocking threads is the main concept around reactive programming, which allows us to handle many requests using relatively small numbers of threads (4 in that case).


15:02:25.875 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.875 --- [ctor-http-nio-1] : request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.891 --- [ctor-http-nio-1] : | request(32)
15:02:25.891 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.892 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.892 --- [ctor-http-nio-1] : | onComplete()
15:02:25.894 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.895 --- [ctor-http-nio-1] : request(32)
15:02:25.924 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:25.925 --- [ctor-http-nio-1] : | request(32)
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:25.925 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:25.926 --- [ctor-http-nio-1] : | onComplete()
15:02:25.927 --- [ctor-http-nio-1] : onSubscribe(MonoFlatMapMany.FlatMapManyMain)
15:02:25.927 --- [ctor-http-nio-1] : request(32)
15:02:26.239 --- [ctor-http-nio-1] : onNext(Person(id=38483, firstName=Name6, lastName=Surname6, age=50))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=59103, firstName=Name6, lastName=Surname6, age=6))
15:02:26.241 --- [ctor-http-nio-1] : onNext(Person(id=67587, firstName=Name6, lastName=Surname6, age=36))
15:02:26.242 --- [ctor-http-nio-1] : onComplete()
15:02:26.283 --- [ctor-http-nio-1] : onNext(Person(id=40006, firstName=Name2, lastName=Surname2, age=41))
15:02:26.284 --- [ctor-http-nio-1] : onNext(Person(id=94727, firstName=Name2, lastName=Surname2, age=79))
15:02:26.287 --- [ctor-http-nio-1] : onNext(Person(id=19891, firstName=Name2, lastName=Surname2, age=84))
15:02:26.287 --- [ctor-http-nio-1] : onComplete()
15:02:26.292 --- [ctor-http-nio-1] : onNext(Person(id=67550, firstName=Name14, lastName=Surname14, age=92))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=85063, firstName=Name14, lastName=Surname14, age=76))
15:02:26.293 --- [ctor-http-nio-1] : onNext(Person(id=54572, firstName=Name14, lastName=Surname14, age=18))
15:02:26.293 --- [ctor-http-nio-1] : onComplete()
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=49896, firstName=Name16, lastName=Surname16, age=20))
15:02:26.295 --- [ctor-http-nio-1] : onNext(Person(id=85684, firstName=Name16, lastName=Surname16, age=39))
15:02:26.296 --- [ctor-http-nio-1] : onNext(Person(id=2605, firstName=Name16, lastName=Surname16, age=75))
15:02:26.296 --- [ctor-http-nio-1] : onComplete()
15:02:26.337 --- [ctor-http-nio-1] : | onSubscribe([Synchronous Fuseable] FluxIterable.IterableSubscription)
15:02:26.338 --- [ctor-http-nio-1] : | request(32)
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=1, firstName=Name01, lastName=Surname01, age=11))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=2, firstName=Name02, lastName=Surname02, age=22))
15:02:26.339 --- [ctor-http-nio-1] : | onNext(Person(id=3, firstName=Name03, lastName=Surname03, age=33))
15:02:26.340 --- [ctor-http-nio-1] : | onComplete()

Long response time

If you are using WebClient for communication with other resources you should be able to handle a long response time. Of course WebClient does not block the thread, but sometimes it is desired to use another thread pool than the main worker thread pool shared with the server. To do that you should use publishOn method. Spring WebFlux provides thread pool abstractions called Schedulers. You may use it to create different concurrency strategies. If you prefer to have a full control of the minimum and maximum number of threads in the pool you should define your own task executor as shown below. The minimum size of the pool is 5, while the maximum size is 10.

@Bean
public ThreadPoolTaskExecutor taskExecutor() {
   ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
   executor.setCorePoolSize(5);
   executor.setMaxPoolSize(10);
   executor.setQueueCapacity(100);
   executor.setThreadNamePrefix("slow-");
   executor.initialize();
   return executor;
}

Then you just need to set it as a default scheduler for publishOn method. Here’s our second test endpoint that sets a dedicated thread pool for WebClient.

@GetMapping("/integration-in-different-pool/{param}")
public Flux<Person> findPersonsIntegrationInDifferentPool(@PathVariable("param") String param) {
   return Flux.fromStream(this::prepareStreamPart1).log()
      .mergeWith(
         client.get().uri("/slow/" + param)
            .retrieve()
            .bodyToFlux(Person.class)
            .log()
            .publishOn(Schedulers.fromExecutor(taskExecutor))
         );
}

Performance Testing

Now, our main goal is to compare performance of the both implemented endpoints. First of them using the same thread pool for server requests processing and for WebClient, while the second creates separate thread pool for WebClient with publishOn. The first step of preparing such a comparison test is to create a mock with delayed response. We will use MockWebServer provided by okhttp. It caches requests send to /slow/{param} endpoint, emits the stream with Person objects delayed 200 ms.

public static MockWebServer mockBackEnd;

@BeforeClass
public static void setUp() throws IOException {
   final Dispatcher dispatcher = new Dispatcher() {

      @NotNull
      @Override
      public MockResponse dispatch(@NotNull RecordedRequest recordedRequest) throws InterruptedException {
         String pathParam = recordedRequest.getPath().replaceAll("/slow/", "");
         List personsPart2 = List.of(new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)),
            new Person(r.nextInt(100000), "Name" + pathParam, "Surname" + pathParam, r.nextInt(100)));
         try {
            return new MockResponse()
               .setResponseCode(200)
               .setBody(mapper.writeValueAsString(personsPart2))
               .setHeader("Content-Type", "application/json")
               .setBodyDelay(200, TimeUnit.MILLISECONDS);
         }
         catch (JsonProcessingException e) {
            e.printStackTrace();
         }
         return null;
      }
   };
   mockBackEnd = new MockWebServer();
   mockBackEnd.setDispatcher(dispatcher);
   mockBackEnd.start();
   System.setProperty("target.uri", "http://localhost:" + mockBackEnd.getPort());
}

@AfterClass
public static void tearDown() throws IOException {
   mockBackEnd.shutdown();
}

The last step in our implementation is to create test methods. For calling both sample endpoints we can use synchronous TestRestTemplate, since it doesn’t impact on the results. For benchmarking I’m using junit-benchmarks library, which allows me to define the number of concurrent threads and maximum number of attempts. To enable that library for JUnit test we just need to define @Rule. Here’s the implementation of our performance test. We are running Spring Boot in test mode and then starting 50 concurrent threads with 300 attempts.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
@RunWith(SpringRunner.class)
public class PerformanceSpringWebFluxTest {

    private static final Logger LOGGER = LoggerFactory.getLogger(PerformanceSpringWebFluxTest.class);
    private static ObjectMapper mapper = new ObjectMapper();
    private static Random r = new Random();
    private static int i = 0;

    @Rule
    public TestRule benchmarkRun = new BenchmarkRule();
    @Autowired
    TestRestTemplate template;
   
    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformance() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }

    @Test
    @BenchmarkOptions(warmupRounds = 10, concurrency = 50, benchmarkRounds = 300)
    public void testPerformanceInDifferentPool() throws InterruptedException {
        ResponseEntity<Person[]> r = template.exchange("/persons/integration-in-different-pool/{param}", HttpMethod.GET, null, Person[].class, ++i);
        Assert.assertEquals(200, r.getStatusCodeValue());
        Assert.assertNotNull(r.getBody());
        Assert.assertEquals(6, r.getBody().length);
    }
}

Here are the results. For an endpoint with a default Spring WebFlux threading model assuming a shareable thread pool between server processing and client requests there is ~5.5s for processing all 300 requests. For a method with a separate thread pool for WebClient we have ~2.9s for all 300 requests. In general, the more threads you will use in your thread the ratio of average processing time between first and second model would be changed in favor of the second model (with separate thread pool for client).

spring-webflux-threading-model-testresult1

And the result for separated WebClient thread pool.

spring-webflux-threading-model-testresult2

Using Spring Boot Actuator

Sometimes, you need to use some additional libraries in your WebFlux application. One of them is Spring Boot Actuator, which may be especially required if you have to expose health check or metrics endpoints. To find out how Spring WebFlux is handling threading for Spring Boot Actuator we will profile our application with YourKit. After running the application you should generate some traffic to /actuator/health endpoint. Here’s the screen with threads from YourKit. As you see a worker thread (reactor-http-nio-*) delegates long-running job threads available in bounded elastic pool (boundedElastic-*).

spring-webflux-threading-model-yourkit

Threads from bounded elastic are checking free disk space, that is by default a part of Spring Boot Actuator /health endpoint.

spring-webflux-threading-model-diskspace

The post A Deep Dive Into Spring WebFlux Threading Model appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/03/30/a-deep-dive-into-spring-webflux-threading-model/feed/ 19 7878
Micronaut Tutorial: Reactive https://piotrminkowski.com/2019/11/12/micronaut-tutorial-reactive/ https://piotrminkowski.com/2019/11/12/micronaut-tutorial-reactive/#respond Tue, 12 Nov 2019 10:17:00 +0000 https://piotrminkowski.wordpress.com/?p=7451 This is the fourth part of my tutorial to Micronaut Framework – created after a longer period of time. In this article I’m going to show you some examples of reactive programming on the server and client side. By default, Micronaut support to reactive APIs and streams is built on top of RxJava. If you […]

The post Micronaut Tutorial: Reactive appeared first on Piotr's TechBlog.

]]>
This is the fourth part of my tutorial to Micronaut Framework – created after a longer period of time. In this article I’m going to show you some examples of reactive programming on the server and client side. By default, Micronaut support to reactive APIs and streams is built on top of RxJava. If you are interested in some previous parts of my tutorial and would like to read it before starting with this part you can learn about basics, security and server-side applications here:

Reactive programming is becoming increasingly popular recently. Therefore, all the newly created web frameworks supports it by default. There is no difference for Micronaut. In this part of tutorial you will learn how to:

  • Use RxJava framework with Micronaut on the server and client side
  • Streaming JSON over HTTP
  • Use low-level HTTP client and declarative HTTP for retrieving reactive stream
  • Regulate back pressure on the server and client side
  • Test reactive API with JUnit

1. Dependencies

Support for reactive programming with RxJava is enabled by default on Micronaut. The dependency io.reactivex.rxjava2:rxjava is included together with some core micronaut libraries like micronaut-runtime or micronaut-http-server-netty on the server side and with micronaut-http-client on the client side. So, the set of dependencies is the same as for example from Part 2 of my tutorial, which has been describing an approach to building classic web application using Micronaut Framework. Just to recap, here’s the list of the most important dependencies in this tutorial:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-inject</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-runtime</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-server-netty</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-management</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-client</artifactId>
   <scope>test</scope>
</dependency>
<dependency>
   <groupId>io.micronaut.test</groupId>
   <artifactId>micronaut-test-junit5</artifactId>
   <scope>test</scope>
</dependency>

2. Controller

Let’s begin from server-side application and a controller implementation. There are some optional annotation for enabling validation (@Validated) and ignoring security constraints (@Secured(SecurityRule.IS_ANONYMOUS)) described in the previous parts of this tutorial. However, the most important thing in the implementation of controller visible below are the RxJava objects used in the return statements: Single, Maybe and Flowable. The rest of implementation is pretty similar to a standard REST controller.

@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);

    List<Person> persons = new ArrayList<>();

    @Post
    public Single<Person> add(@Body @Valid Person person) {
        person.setId(persons.size() + 1);
        persons.add(person);
        return Single.just(person);
    }

    @Get("/{id}")
    public Maybe<Person> findById(@PathVariable Integer id) {
       return Maybe.just(persons.stream().filter(person -> person.getId().equals(id)).findAny().get());
   }

    @Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStream() {
        return Flowable.fromIterable(persons).doOnNext(person -> LOGGER.info("Server: {}", person));
    }

}

In the implementation of controller visible above I used 3 of 5 available RxJava2 types that can be observed:

  • Single – an observable that emits only one item and then completes. It ensures that one item will be sent, so it’s for non empty outputs.
  • Maybe – works very similar to a Single, but with a particular difference: it can complete without emitting a value. This is useful when we have optional emissions.
  • Flowable – it emits a stream of elements and supports back pressure mechanism

Now, if you understand the meaning of basic observable types in RxJava the example controller should become pretty easy for you. We have three methods: add for adding new element into the list, findById for searching by id that may not return any element and findAllStream that emits all elements as a stream. The last method has to produce application/x-json-stream in order to take an advantage of reactive streams also on the client side. When using that type of content type, events are retrieved continuously by the HTTP client thanks to the Flowable type.

3. Using Low-level Reactive HTTP Client

Micronaut Reactive offers two type of clients for accessing HTTP APIs: low-level client and declarative client. We can choose between HttpClient, RxHttpClient and RxStreamingHttpClient with support for streaming data over HTTP. The recommended way for accessing reference to a client is through injecting it with @Client annotation. However, the most suitable way inside JUnit test is with the create static method of the RxHttpClient, since we may dynamically set port number.
Here’s the implementation of JUnit tests with low-level HTTP client. To read a Single or Maybe we are using method retrieve of RxHttpClient. After subscribing to an observable I’m using using ConcurrentUnit library for handling asynchronous results in the test. For accessing Flowable returned as application/x-json-stream on the server side we need to use RxStreamingHttpClient. It provides method jsonStream, which is dedicated for reading a non-blocking stream of JSON objects.

@MicronautTest
public class PersonReactiveControllerTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);

    @Inject
    EmbeddedServer server;

    @Test
    public void testAdd() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
        RxHttpClient client = RxHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        Single<Person> s = client.retrieve(HttpRequest.POST("/persons/reactive", person), Person.class).firstOrError();
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertNotNull(person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindById() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        RxHttpClient client = RxHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        Maybe<Person> s = client.retrieve(HttpRequest.GET("/persons/reactive/1"), Person.class).firstElement();
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertEquals(1, person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindAllStream() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        client.jsonStream(HttpRequest.GET("/persons/reactive/stream"), Person.class)
                .subscribe(s -> {
                    LOGGER.info("Client: {}", s);
                    waiter.assertNotNull(s);
                    waiter.resume();
                });
        waiter.await(3000, TimeUnit.MILLISECONDS, 9);
    }
   
}

4. Back Pressure

One of the main term related to reactive programming is back pressure. Backpressure is resistance or force opposing the desired flow of data through software. In simple words, if a producer sends more events than a consumer is able to handle in a specific period of time, the consumer should be able to regulate the frequency of sending events on the producer side. Of course, Micronaut Reactive supports backpressure. Let’s analyze how we may control it for Micronaut application on some simple examples.
We may “somehow” control back pressure on the server-side and also on the client-side. However, the client is not able to regulate emission parameters on the server side due to the TCP transport layer. Ok, now let’s implement additional methods in our sample controller. I’m using fromCallable for emitting elements within Flowable stream. We are producing 9 elements by calling method repeat method. The second newly created method is findAllStreamWithCallableDelayed, which additionally delay each element on the stream 100 milliseconds. In this way, we can control back pressure on the server-side.

@Controller("/persons/reactive")
@Secured(SecurityRule.IS_ANONYMOUS)
@Validated
public class PersonReactiveController {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveController.class);

    List<Person> persons = new ArrayList<>();
   
    @Get(value = "/stream/callable", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStreamWithCallable() {
        return Flowable.fromCallable(() -> {
            int r = new Random().nextInt(100);
            Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
            return p;
        }).doOnNext(person -> LOGGER.info("Server: {}", person))
                .repeat(9);
    }

    @Get(value = "/stream/callable/delayed", produces = MediaType.APPLICATION_JSON_STREAM)
    public Flowable<Person> findAllStreamWithCallableDelayed() {
        return Flowable.fromCallable(() -> {
            int r = new Random().nextInt(100);
            Person p = new Person(r, "Name"+r, "Surname"+r, r, Gender.MALE);
            return p;
        }).doOnNext(person -> LOGGER.info("Server: {}", person))
                .repeat(9).delay(100, TimeUnit.MILLISECONDS);
    }
   
}

We can also control back pressure on the client-side. However, as I have mentioned before it does not have any effect on the emission on the server-side. Now, let’s consider the following test that verifies the delayed stream on the server-side.

@Test
public void testFindAllStreamDelayed() throws MalformedURLException, TimeoutException, InterruptedException {
   final Waiter waiter = new Waiter();
   RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
   client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable/delayed"), Person.class)
      .subscribe(s -> {
         LOGGER.info("Client: {}", s);
         waiter.assertNotNull(s);
         waiter.resume();
      });
   waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}

Here’s the result of the test. Since the server delay every element 100 milliseconds, the client receives and prints the element just after emission.

micronaut-tut-4-1

For a comparison let’s consider the following test. This time we are implementing our custom Subscriber that requests a single element just after processing the previous one. The following test verifies not delayed stream.

@Test
public void testFindAllStreamWithCallable() throws MalformedURLException, TimeoutException, InterruptedException {
   final Waiter waiter = new Waiter();
   RxStreamingHttpClient client = RxStreamingHttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
   client.jsonStream(HttpRequest.GET("/persons/reactive/stream/callable"), Person.class)
      .subscribe(new Subscriber<Person>() {

         Subscription s;

         @Override
         public void onSubscribe(Subscription subscription) {
            subscription.request(1);
            s = subscription;
         }

         @Override
         public void onNext(Person person) {
            LOGGER.info("Client: {}", person);
            waiter.assertNotNull(person);
            waiter.resume();
            s.request(1);
         }

         @Override
         public void onError(Throwable throwable) {

         }

         @Override
         public void onComplete() {

         }
      });
   waiter.await(3000, TimeUnit.MILLISECONDS, 9);
}

Here’s the result of our test. It finishes succesfully, so the subscriber works properly. However, as you can see the back pressure is controlled just on the client side, since server emits all the elements, and then client retrieves them.

micronaut-tut-4-2

5. Declarative HTTP Client

Besides low-level HTTP client Micronaut Reactive allows to create declarative clients via the @Client annotation. If you are familiar with for example OpenFeign project or you have read the second part of my tutorial to Micronaut Framework you probably understand the concept of declarative client. In fact, we just need to create an interface with similar methods to the methods defined inside the controller and annotate them properly. The client interface should be annotated with @Client as shown below. The interface is placed inside src/test/java since it is used just in JUnit tests.

@Client("/persons/reactive")
public interface PersonReactiveClient {

    @Post
    Single<Person> add(@Body Person person);

    @Get("/{id}")
    Maybe<Person> findById(@PathVariable Integer id);

    @Get(value = "/stream", produces = MediaType.APPLICATION_JSON_STREAM)
    Flowable<Person> findAllStream();

}

The declarative client need to be injected into the test. Here are the same test methods as implemented for low-level client, but now with declarative client.

@MicronautTest
public class PersonReactiveControllerTests {

    private static final Logger LOGGER = LoggerFactory.getLogger(PersonReactiveControllerTests.class);

    @Inject
    EmbeddedServer server;
    @Inject
    PersonReactiveClient client;

    @Test
    public void testAddDeclarative() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        final Person person = new Person(null, "Name100", "Surname100", 22, Gender.MALE);
        Single<Person> s = client.add(person);
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertNotNull(person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindByIdDeclarative() throws TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Maybe<Person> s = client.findById(1);
        s.subscribe(person1 -> {
            LOGGER.info("Retrieved: {}", person1);
            waiter.assertNotNull(person1);
            waiter.assertEquals(1, person1.getId());
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS);
    }

    @Test
    public void testFindAllStreamDeclarative() throws MalformedURLException, TimeoutException, InterruptedException {
        final Waiter waiter = new Waiter();
        Flowable<Person> persons = client.findAllStream();
        persons.subscribe(s -> {
            LOGGER.info("Client: {}", s);
            waiter.assertNotNull(s);
            waiter.resume();
        });
        waiter.await(3000, TimeUnit.MILLISECONDS, 9);
    }

}

Source Code

We were using the same repository as for two previous parts of my Micronaut tutorial: https://github.com/piomin/sample-micronaut-applications.git.

The post Micronaut Tutorial: Reactive appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/11/12/micronaut-tutorial-reactive/feed/ 0 7451
Micronaut Tutorial: Server Application https://piotrminkowski.com/2019/04/23/micronaut-tutorial-server-application/ https://piotrminkowski.com/2019/04/23/micronaut-tutorial-server-application/#comments Tue, 23 Apr 2019 07:30:34 +0000 https://piotrminkowski.wordpress.com/?p=7115 In this part of my tutorial to Micronaut framework we are going to create a simple HTTP server-side application running on Netty. We have already discussed the most interesting core features of Micronaut like beans, scopes or unit testing in the first part of that tutorial. For more details you may refer to my article […]

The post Micronaut Tutorial: Server Application appeared first on Piotr's TechBlog.

]]>
In this part of my tutorial to Micronaut framework we are going to create a simple HTTP server-side application running on Netty. We have already discussed the most interesting core features of Micronaut like beans, scopes or unit testing in the first part of that tutorial. For more details you may refer to my article Micronaut Tutorial: Beans and Scopes.

Assuming we have a basic knowledge about core mechanisms of Micronaut we may proceed to the key part of that framework and discuss how to build a simple microservice application exposing REST API over HTTP.

Embedded Micronaut HTTP Server

First, we need to include dependency to our pom.xml responsible for running an embedded server during application startup. By default, Micronaut starts on Netty server, so we only need to include the following dependency:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-server-netty</artifactId>
</dependency>

Assuming, we have the following main class defined, we only need to run it:

public class MainApp {

    public static void main(String[] args) {
        Micronaut.run(MainApp.class);
    }

}

By default Netty server runs on port 8080. You may override it to force the server to run on a specific port by setting the following property in your application.yml or bootstrap.yml. You can also set the value of this property to -1 to run the server on a randomly generated port.

micronaut:
  server:
    port: 8100

Creating HTTP Web Application

If you are already familiar with Spring Boot you should not have any problems with building a simple REST server-side application using Micronaut. The approach is almost identical. We just have to create a controller class and annotate it with @Controller. Micronaut supports all HTTP method types. You will probably use: @Get, @Post, @Delete, @Put or @Patch. Here’s our sample controller class that implements methods for adding new Person object, finding all persons or a single person by id:

@Controller("/persons")
public class PersonController {

    List<Person> persons = new ArrayList<>();

    @Post
    public Person add(Person person) {
        person.setId(persons.size() + 1);
        persons.add(person);
        return person;
    }

    @Get("/{id}")
    public Optional<Person> findById(Integer id) {
        return persons.stream()
                .filter(it -> it.getId().equals(id))
                .findFirst();
    }

    @Get
    public List<Person> findAll() {
        return persons;
    }

}

Request variables are resolved automatically and bind to the variable with the same name. Micronaut populates methods arguments from URI variables like /{variableName} and GET query parameters like ?paramName=paramValue. If the request contains JSON body you should annotate it with @Body. Our sample controller is very simple. It does not perform any input data validation. Let’s change it.

Validation

To be able to perform HTTP requests validation we should first include the following dependencies to our pom.xml:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-validation</artifactId>
</dependency>
<dependency>
   <groupId>io.micronaut.configuration</groupId>
   <artifactId>micronaut-hibernate-validator</artifactId>
</dependency>

Validation in Micronaut is based on JSR-380, also known as Bean Validation 2.0. We can use javax.validation annotations such as @NotNull, @Min or @Max. Micronaut uses an implementation provided by Hibernate Validator, so even if you don’t use any JPA in your project, you have to include micronaut-hibernate-validator to your dependencies. After that we may add a validation to our model class using some javax.validation annotations. Here’s a Person model class with validation. All the fields are required: firstName and lastName cannot be blank, id cannot be greater than 10000, age cannot be lower than 0.

public class Person {

    @Max(10000)
    private Integer id;
    @NotBlank
    private String firstName;
    @NotBlank
    private String lastName;
    @PositiveOrZero
    private int age;
    @NotNull
    private Gender gender;

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }

    public Gender getGender() {
        return gender;
    }

    public void setGender(Gender gender) {
        this.gender = gender;
    }
   
}

Now, we need to modify the code of our controller. First, it needs to be annotated with @Validated. Also @Body parameter of POST method should be annotated with @Valid. The REST method argument may also be validated using JSR-380 annotation. Alternatively, we may configure validation using URI templates. The annotation @Get("/{id:4}") indicates that a variable can contain 4 characters max (is lower than 10000) or a query parameter is optional as shown here: @Get("{?max,offset}").
Here’s the current implementation of our controller. Besides validation, I have also implemented pagination for findAll based on offset and limit optional parameters:

@Controller("/persons")
@Validated
public class PersonController {

    List<Person> persons = new ArrayList<>();

    @Post
    public Person add(@Body @Valid Person person) {
        person.setId(persons.size() + 1);
        persons.add(person);
        return person;
    }

    @Get("/{id:4}")
    public Optional<Person> findById(@NotNull Integer id) {
        return persons.stream()
                .filter(it -> it.getId().equals(id))
                .findFirst();
    }

    @Get("{?max,offset}")
    public List<Person> findAll(@Nullable Integer max, @Nullable Integer offset) {
        return persons.stream()
                .skip(offset == null ? 0 : offset)
                .limit(max == null ? 10000 : max)
                .collect(Collectors.toList());
    }

}

Since we have finished the implementation of our controller, it is the right time to test it.

Testing Micronaut with embedded HTTP server

We have already discussed testing with Micronaut in the first part of my tutorial. The only difference in comparison to those tests is the necessity of running an embedded server and call endpoint via HTTP. To do that we have to include the dependency with Micronaut HTTP client:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-http-client</artifactId>
   <scope>test</scope>
</dependency>

We should also inject an instance of embedded server in order to be able to detect its address (for example if a port number is generated automatically):

@MicronautTest
public class PersonControllerTests {

    @Inject
    EmbeddedServer server;
   
   // tests implementation ...
   
}

We are building Micronaut HTTP Client programmatically by calling static method create. It is also possible to obtain a reference to HttpClient by annotating it with @Client.
The following test implementation is based on JUnit 5. I have provided the positive test for all the exposed endpoints and one negative scenario with not valid input data (age field lower than zero). Micronaut HTTP client can be used in both asynchronous non-blocking mode and synchronous blocking mode. In that case we force it to work in blocking mode.

@MicronautTest
public class PersonControllerTests {

    @Inject
    EmbeddedServer server;

    @Test
    public void testAdd() throws MalformedURLException {
        HttpClient client = HttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        Person person = new Person();
        person.setFirstName("John");
        person.setLastName("Smith");
        person.setAge(33);
        person.setGender(Gender.MALE);
        person = client.toBlocking().retrieve(HttpRequest.POST("/persons", person), Person.class);
        Assertions.assertNotNull(person);
        Assertions.assertEquals(Integer.valueOf(1), person.getId());
    }

    @Test
    public void testAddNotValid() throws MalformedURLException {
        HttpClient client = HttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        final Person person = new Person();
        person.setFirstName("John");
        person.setLastName("Smith");
        person.setAge(-1);
        person.setGender(Gender.MALE);

        Assertions.assertThrows(HttpClientResponseException.class,
                () -> client.toBlocking().retrieve(HttpRequest.POST("/persons", person), Person.class),
                "person.age: must be greater than or equal to 0");
    }

    @Test
    public void testFindById() throws MalformedURLException {
        HttpClient client = HttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        Person person = client.toBlocking().retrieve(HttpRequest.GET("/persons/1"), Person.class);
        Assertions.assertNotNull(person);
    }

    @Test
    public void testFindAll() throws MalformedURLException {
        HttpClient client = HttpClient.create(new URL("http://" + server.getHost() + ":" + server.getPort()));
        Person[] persons = client.toBlocking().retrieve(HttpRequest.GET("/persons"), Person[].class);
        Assertions.assertEquals(1, persons.length);
    }

}

We have already built the simple web application that exposes some methods over REST API, validates input data and includes JUnit API tests. Now, we may discuss some more advanced, interesting Micronaut features. First of them is built-in support for API versioning.

API versioning

Since 1.1, Micronaut supports API versioning via a dedicated @Version annotation. To test this feature we will add a new version of findAll method to our controller class. The new version of this method requires to set input parameters max and offset, which were optional for the first version of the method.

@Version("1")
@Get("{?max,offset}")
public List<Person> findAll(@Nullable Integer max, @Nullable Integer offset) {
   return persons.stream()
         .skip(offset == null ? 0 : offset)
         .limit(max == null ? 10000 : max)
         .collect(Collectors.toList());
}

@Version("2")
@Get("?max,offset")
public List<Person> findAllV2(@NotNull Integer max, @NotNull Integer offset) {
   return persons.stream()
         .skip(offset == null ? 0 : offset)
         .limit(max == null ? 10000 : max)
         .collect(Collectors.toList());
}

Versioning feature is not enabled by default. To do that, you need to set property micronaut.router.versioning.enabled to true in application.yml. We will also set default version to 1, which is compatible with tests created in the previous section that does not use versioning feature:

micronaut:
  router:
    versioning:
      enabled: true
      default-version: 1

Micronaut automatic versioning is supported by a declarative HTTP client. To create such a client we need to define an interface that contains signature of target server-side method, and is annotated with @Client. Here’s declarative client interface responsible only for communicating with version 2 of findAll method:

@Client("/persons")
public interface PersonClient {

    @Version("2")
    @Get("?max,offset")
    List<Person> findAllV2(Integer max, Integer offset);

}

The PersonClient declared above may be injected into the test and used for calling API method exposed by server-side application:


@Inject
PersonClient client;

@Test
public void testFindAllV2() {
   List<Person> persons = client.findAllV2(10, 0);
   Assertions.assertEquals(1, persons.size());
}

API Documentation with Swagger

Micronaut provides built-in support for generating Open API / Swagger YAML documentation at compilation time. We can customize produced documentation using standard Swagger annotations. To enable this support for our application we should add the following swagger-annotations dependency to pom.xml, and enable annotation processing for micronaut-openapi module inside Maven compiler plugin configuration:

<dependency>
   <groupId>io.swagger.core.v3</groupId>
   <artifactId>swagger-annotations</artifactId>
</dependency>
...
<plugin>
   <groupId>org.apache.maven.plugins</groupId>
   <artifactId>maven-compiler-plugin</artifactId>
   <version>3.7.0</version>
   <configuration>
      <source>${jdk.version}</source>
      <target>${jdk.version}</target>
      <compilerArgs>
         <arg>-parameters</arg>
      </compilerArgs>
      <annotationProcessorPaths>
         <path>
            <groupId>io.micronaut</groupId>
            <artifactId>micronaut-inject-java</artifactId>
            <version>${micronaut.version}</version>
         </path>
         <path>
            <groupId>io.micronaut.configuration</groupId>
            <artifactId>micronaut-openapi</artifactId>
            <version>${micronaut.version}</version>
         </path>
      </annotationProcessorPaths>
   </configuration>
   ...
</plugin>

We have to include some basic information to the generated Swagger YAML like application name, description, version number or author name using @OpenAPIDefinition annotation:

@OpenAPIDefinition(
   info = @Info(
      title = "Sample Application",
      version = "1.0",
      description = "Sample API",
      contact = @Contact(url = "https://piotrminkowski.wordpress.com", name = "Piotr Mińkowski", email = "piotr.minkowski@gmail.com")
   )
)
public class MainApp {

    public static void main(String[] args) {
        Micronaut.run(MainApp.class);
    }

}

Micronaut generates the Swagger manifest based on title and version fields inside @Info annotation. In that case our YAML definition file is available under name sample-application-1.0.yml, and will be generated to the META-INF/swagger directory. We can expose it outside the application using HTTP endpoint. Here’s the appropriate configuration provided inside application.yml file.

micronaut:
  static-resources:
    swagger:
     paths: classpath:META-INF/swagger
     mapping: /swagger/**

Assuming our application is running on port 8100 Swagger definition is available under the path http://localhost:8100/swagger/sample-application-1.0.yml. You can call this endpoint and copy the response to any Swagger editor as shown below.

micronaut-6

Management and Monitoring Endpoints

Micronaut provides some built-in HTTP endpoints used for management and monitoring. To enable them for the application we first need to include the following dependency:

<dependency>
   <groupId>io.micronaut</groupId>
   <artifactId>micronaut-management</artifactId>
</dependency>

There are no endpoints exposed by default outside application. If you would like to expose them all you should set property endpoints.all.enabled to true. Alternatively, you can enable or disable the single endpoint just by using its id instead of all in the name of property. Also, some of built-in endpoints require authentication, and some not. You may enable/disable it for all endpoints using property endpoints.all.enabled. The following configuration inside application.yaml enables all built-in endpoints except stop endpoints using for graceful shutdown of application, and disables authentication for all the enabled endpoints:

endpoints:
  all:
    enabled: true
    sensitive: false
  stop:
    enabled: false

You may use one of the following:

  • GET /beans – returns information about the loaded bean definitions
  • GET /info – returns static information from the state of the application
  • GET /health – exposes “healthcheck”
  • POST /refresh – it is refresh the application state, all the beans annotated with @Refreshable will be reloaded
  • GET /routes – returns information about URIs exposed by the application
  • GET /logger – returns information about the available loggers
  • GET /caches – returns information about the caches
  • POST /stop – it shuts down the application server

Summary

In this tutorial you have learned how to:

  • Build a simple application that exposes some HTTP endpoints
  • Validate input data inside controller
  • Test your controller with JUnit 5 on embedded Netty using Micronaut HTTP client
  • Use built-in API versioning
  • Generate Swagger API documentation automatically
  • Using build-in management and monitoring endpoints

The first part of my tutorial is available here: https://piotrminkowski.wordpress.com/2019/04/15/micronaut-tutorial-beans-and-scopes/. It uses the same repository as the current part: https://github.com/piomin/sample-micronaut-applications.git.

The post Micronaut Tutorial: Server Application appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/04/23/micronaut-tutorial-server-application/feed/ 3 7115
Reactive Microservices with Spring WebFlux and Spring Cloud https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/ https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/#comments Fri, 04 May 2018 09:32:45 +0000 https://piotrminkowski.wordpress.com/?p=6475 I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try […]

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
I have already described Spring reactive support about one year ago in the article Reactive microservices with Spring 5. At that time the project Spring WebFlux was under active development. Now after the official release of Spring 5 it is worth to take a look at the current version of it. Moreover, we will try to put our reactive microservices inside Spring Cloud ecosystem, which contains such the elements like service discovery with Eureka, load balancing with Spring Cloud Commons @LoadBalanced, and API gateway using Spring Cloud Gateway (also based on WebFlux and Netty). We will also check out Spring reactive support for NoSQL databases by the example of Spring Data Reactive Mongo project.

Here’s the figure that illustrates an architecture of our sample system consisting of two microservices, discovery server, gateway and MongoDB databases. The source code is as usual available on GitHub in sample-spring-cloud-webflux repository.

reactive-1

Let’s describe the further steps on the way to create the system illustrated above.

Step 1. Building reactive application using Spring WebFlux

To enable library Spring WebFlux for the project we should include starter spring-boot-starter-webflux to the dependencies. It includes some dependent libraries like Reactor or Netty server.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>

REST controller looks pretty similar to the controller defined for synchronous web services. The only difference is in type of returned objects. Instead of a single object we return an instance of class Mono, and instead of list we return instance of class Flux. Thanks to Spring Data Reactive Mongo we don’t have to do anything more that call the needed method on the repository bean.


@RestController
public class AccountController {

   private static final Logger LOGGER = LoggerFactory.getLogger(AccountController.class);

   @Autowired
   private AccountRepository repository;

   @GetMapping("/customer/{customer}")
   public Flux findByCustomer(@PathVariable("customer") String customerId) {
      LOGGER.info("findByCustomer: customerId={}", customerId);
      return repository.findByCustomerId(customerId);
   }

   @GetMapping
   public Flux findAll() {
      LOGGER.info("findAll");
      return repository.findAll();
   }

   @GetMapping("/{id}")
   public Mono findById(@PathVariable("id") String id) {
   LOGGER.info("findById: id={}", id);
      return repository.findById(id);
   }

   @PostMapping
   public Mono create(@RequestBody Account account) {
      LOGGER.info("create: {}", account);
      return repository.save(account);
   }

}

Step 2. Integrate an application with database using Spring Data Reactive Mongo

The implementation of integration between application and database is also very simple. First, we need to include starter spring-boot-starter-data-mongodb-reactive to the project dependencies.

<dependency>
  <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>

The support for reactive Mongo repositories is automatically enabled after including the starter. The next step is to declare entity with ORM mappings. The following class is also returned as reponse by AccountController.

@Document
public class Account {

   @Id
   private String id;
   private String number;
   private String customerId;
  private int amount;

   ...

}

Finally, we may create a repository interface that extends ReactiveCrudRepository. It follows the patterns implemented by Spring Data JPA and provides some basic methods for CRUD operations. It also allows you to define methods with names, which are automatically mapped to queries. The only difference in comparison with standard Spring Data JPA repositories is in method signatures. The objects are wrapped by Mono and Flux.

public interface AccountRepository extends ReactiveCrudRepository {

   Flux findByCustomerId(String customerId);

}

In this example I used Docker container for running MongoDB locally. Because I run Docker on Windows using Docker Toolkit the default address of Docker machine is 192.168.99.100. Here’s the configuration of data source in application.yml file.

spring:
  data:
    mongodb:
      uri: mongodb://192.168.99.100/test

Step 3. Enabling service discovery using Eureka

Integration with Spring Cloud Eureka is pretty the same as for synchronous REST microservices. To enable discovery client we should first include starter spring-cloud-starter-netflix-eureka-client to the project dependencies.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

Then we have to enable it using @EnableDiscoveryClient annotation.

@SpringBootApplication
@EnableDiscoveryClient
public class AccountApplication {

   public static void main(String[] args) {
      SpringApplication.run(AccountApplication.class, args);
   }

}

Microservice will automatically register itself in Eureka. Of course, we may run more than one instance of every service. Here’s the screen illustrating Eureka Dashboard (http://localhost:8761) after running two instances of account-service and a single instance of customer-service.  I would not like to go into the details of running application with embedded Eureka server. You may refer to my previous article for details: Quick Guide to Microservices with Spring Boot 2.0, Eureka and Spring Cloud. Eureka server is available as discovery-service module.

spring-reactive

Step 4. Inter-service communication between reactive microservices with WebClient

An inter-service communication is realized by the WebClient from Spring WebFlux project. The same as for RestTemplate you should annotate it with Spring Cloud Commons @LoadBalanced . It enables integration with service discovery and load balancing using Netflix OSS Ribbon client. So, the first step is to declare a client builder bean with @LoadBalanced annotation.

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() {
   return WebClient.builder();
}

Then we may inject WebClientBuilder into the REST controller. Communication with account-service is implemented inside GET /{id}/with-accounts , where first we are searching for customer entity using reactive Spring Data repository. It returns object Mono , while the WebClient returns Flux . Now, our main goal is to merge those to publishers and return single Mono object with the list of accounts taken from Flux without blocking the stream. The following fragment of code illustrates how I used WebClient to communicate with other microservice, and then merge the response and result from repository to single Mono object. This merge may probably be done in more “elegant” way, so feel free to create push request with your proposal.

@Autowired
private WebClient.Builder webClientBuilder;

@GetMapping("/{id}/with-accounts")
public Mono findByIdWithAccounts(@PathVariable("id") String id) {
   LOGGER.info("findByIdWithAccounts: id={}", id);
   Flux accounts = webClientBuilder.build().get().uri("http://account-service/customer/{customer}", id).retrieve().bodyToFlux(Account.class);
   return accounts
      .collectList()
      .map(a -> new Customer(a))
      .mergeWith(repository.findById(id))
      .collectList()
      .map(CustomerMapper::map);
}

Step 5. Building API gateway using Spring Cloud Gateway

Spring Cloud Gateway is one of the newest Spring Cloud projects. It is built on top of Spring WebFlux, and thanks to that we may use it as a gateway to our sample system based on reactive microservices with Spring Boot. Similar to Spring WebFlux applications it is run on an embedded Netty server. To enable it for the Spring Boot application just include the following dependency to your project.

<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-gateway</artifactId>
</dependency>

We should also enable a discovery client in order to allow the gateway to fetch a list of registered microservices. However, there is no need to register a gateway application in Eureka. To disable registration you may set property eureka.client.registerWithEureka to false inside application.yml file.

@SpringBootApplication
@EnableDiscoveryClient
public class GatewayApplication {

   public static void main(String[] args) {
      SpringApplication.run(GatewayApplication.class, args);
   }

}

By default, Spring Cloud Gateway does not enable integration with service discovery. To enable it we should set property spring.cloud.gateway.discovery.locator.enabled to true. Now, the last thing that should be done is the configuration of the routes. Spring Cloud Gateway provides two types of components that may be configured inside routes: filters and predicates. Predicates are used for matching HTTP requests with the route, while filters can be used to modify requests and responses before or after sending the downstream request. Here’s the full configuration of gateway. It enables service discovery location, and defines two routes based on entries in service registry. We use the Path Route Predicate factory for matching the incoming requests, and the RewritePath GatewayFilter factory for modifying the requested path to adapt it to the format exposed by the downstream services (endpoints are exposed under path /, while gateway expose them under paths /account and /customer).

spring:
  cloud:
    gateway:
      discovery:
        locator:
          enabled: true
      routes:
      - id: account-service
        uri: lb://account-service
        predicates:
        - Path=/account/**
        filters:
        - RewritePath=/account/(?.*), /$\{path}
      - id: customer-service
        uri: lb://customer-service
        predicates:
        - Path=/customer/**
        filters:
        - RewritePath=/customer/(?.*), /$\{path}

Step 6. Testing reactive microservices with Spring Boot

Before making some tests let’s just recap our sample system. We have two microservices account-service, customer-service that use MongoDB as a database. Microservice customer-service calls endpoint GET /customer/{customer} exposed by account-service. The URL of account-service is taken from Eureka. The whole sample system is hidden behind the gateway, which is available under address localhost:8090.
Now, the first step is to run MongoDB on a Docker container. After executing the following command Mongo is available under address 192.168.99.100:27017.

$ docker run -d --name mongo -p 27017:27017 mongo

Then we may proceed to running discovery-service. Eureka is available under its default address localhost:8761. You may run it using your IDE or just by executing command java -jar target/discovery-service-1.0-SNAPHOT.jar. The same rule applies to our sample microservices. However, account-service needs to be multiplied in two instances, so you need to override default HTTP port when running second instance using -Dserver.port VM argument, for example java -jar -Dserver.port=2223 target/account-service-1.0-SNAPSHOT.jar. Finally, after running gateway-service we may add some test data.

$ curl --header "Content-Type: application/json" --request POST --data '{"firstName": "John","lastName": "Scott","age": 30}' http://localhost:8090/customer
{"id": "5aec1debfa656c0b38b952b4","firstName": "John","lastName": "Scott","age": 30,"accounts": null}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567890","amount": 5000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e86fa656c11d4c655fb","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 5000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567891","amount": 12000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e91fa656c11d4c655fc","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 12000}
$ curl --header "Content-Type: application/json" --request POST --data '{"number": "1234567892","amount": 2000,"customerId": "5aec1debfa656c0b38b952b4"}' http://localhost:8090/account
{"id": "5aec1e99fa656c11d4c655fd","number": "1234567892","customerId": "5aec1debfa656c0b38b952b4","amount": 2000}

To test inter-service communication just call endpoint GET /customer/{id}/with-accounts on gateway-service. It forwards the request to customer-service, and then customer-service calls endpoint exposed by account-service using reactive WebClient. The result is visible below.

reactive-2

Conclusion

Since Spring 5 and Spring Boot 2.0 there is a full range of available ways to build microservices-based architecture. We can build standard synchronous system using one-to-one communication with Spring Cloud Netflix project, messaging microservices based on message broker and publish/subscribe communication model with Spring Cloud Stream, and finally asynchronous, reactive microservices with Spring WebFlux. The main goal of this article is to show you how to use Spring WebFlux together with Spring Cloud projects in order to provide such mechanisms like service discovery, load balancing or API gateway for reactive microservices built on top of Spring Boot. Before Spring 5 the lack of support for reactive microservices Spring Boot support was one of the drawbacks of Spring framework, but now with Spring WebFlux it is no longer the case. Not only that, we may leverage Spring reactive support for the most popular NoSQL databases like MongoDB or Cassandra, and easily place our reactive microservices inside one system together with synchronous REST microservices.

The post Reactive Microservices with Spring WebFlux and Spring Cloud appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/05/04/reactive-microservices-with-spring-webflux-and-spring-cloud/feed/ 4 6475