R2DBC Archives - Piotr's TechBlog https://piotrminkowski.com/tag/r2dbc/ Java, Spring, Kotlin, microservices, Kubernetes, containers Fri, 28 Jul 2023 14:58:24 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 R2DBC Archives - Piotr's TechBlog https://piotrminkowski.com/tag/r2dbc/ 32 32 181738725 Reactive Spring Boot with WebFlux, R2DBC and Postgres https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/ https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/#respond Fri, 28 Jul 2023 14:58:22 +0000 https://piotrminkowski.com/?p=14359 In this article, you will learn how to implement and test reactive Spring Boot apps using Spring WebFlux, R2DBC, and Postgres database. We will create two simple apps written in Kotlin using the latest version of Spring Boot 3. Our apps expose some REST endpoints over HTTP. In order to test the communication between them […]

The post Reactive Spring Boot with WebFlux, R2DBC and Postgres appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to implement and test reactive Spring Boot apps using Spring WebFlux, R2DBC, and Postgres database. We will create two simple apps written in Kotlin using the latest version of Spring Boot 3. Our apps expose some REST endpoints over HTTP. In order to test the communication between them and integration with the Postgres database, we will use Testcontainers and Netty Mock Server.

If you are looking for more guides to Spring Boot 3, you can look at other posts on my blog. In that article, I’m describing how to build a microservices architecture with Spring Boot 3 and Spring Cloud. You can also read about the latest changes in observability with Spring Boot 3 and learn how to integrate your app with Grafana Stack in that article. Of course, these are just a few examples – you can find more content in this area on my blog.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that, you need to clone my GitHub repository. It contains two apps inside employee-service and organization-service directories. After that, you should just follow my instructions.

Dependencies

In the first step, we will add several dependencies related to Kotlin. Besides the standard libraries, we can include Kotlin support for Jackson (JSON serialization/deserialization):

<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.module</groupId>
  <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
  <groupId>org.jetbrains.kotlin</groupId>
  <artifactId>kotlin-reflect</artifactId>
</dependency>

We also need to include two Spring Boot Starters. In order to create a reactive Spring @Controller, we need to use the Spring WebFlux module. With Spring Boot Data R2DBC Starter we can use Spring Data Repositories in a reactive way. Finally, we have to include the Postgres driver provided by R2DBC.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-r2dbc</artifactId>
</dependency>
<dependency>
  <groupId>org.postgresql</groupId>
  <artifactId>r2dbc-postgresql</artifactId>
  <scope>runtime</scope>
</dependency>

There are several testing dependencies in our project. We need to include a standard Spring Boot Test Starter, Testcontainers with JUnit 5, Postgres, and R2DBC support, and finally Mock Server Netty module for mocking reactive API. It is also worth adding the spring-boot-testcontainers module to take advantage of built-in integration between Spring Boot and Testcontainers.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-test</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>r2dbc</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>postgresql</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.testcontainers</groupId>
  <artifactId>junit-jupiter</artifactId>
  <version>1.18.3</version>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-testcontainers</artifactId>
  <scope>test</scope>
</dependency>
<dependency>
  <groupId>org.mock-server</groupId>
  <artifactId>mockserver-netty</artifactId>
  <version>5.15.0</version>
  <scope>test</scope>
</dependency>

The last dependency is optional. We can include Spring Boot Actuator in our apps. It adds R2DBC connection status to health checks and several metrics with the pool status.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

Implement the Spring Reactive App

Here’s our model class for the first app – employee-service:

class Employee(val name: String, 
               val salary: Int, 
               val organizationId: Int) {
    @Id var id: Int? = null
}

Here’s the repository interface. It needs to extend the R2dbcRepository interface. The same as with the standard Spring Data Repositories we can define several find methods. However, instead of the entities they wrap the return objects with the Reactor Mono or Flux.

interface EmployeeRepository: R2dbcRepository<Employee, Int> {
    fun findByOrganizationId(organizationId: Int): Flux<Employee>
}

Here’s the implementation of our @RestController. We need to inject the EmployeeRepository bean. Then we use the repository bean to interact with the database in a reactive way. Our endpoints also return objects wrapped by the Reactor Mono and Flux. There are three find endpoints and a single POST endpoint:

  • Searching all the employees (1)
  • Searching by the employee id (2)
  • Searching all employees by the organization id (3)
  • Adding new employees (4)
@RestController
@RequestMapping("/employees")
class EmployeeController {

   @Autowired
   lateinit var repository : EmployeeRepository

   @GetMapping // (1)
   fun findAll() : Flux<Employee> = repository.findAll()

   @GetMapping("/{id}") // (2)
   fun findById(@PathVariable id: Int) : Mono<Employee> = 
      repository.findById(id)

   @GetMapping("/organization/{organizationId}") // (3)
   fun findByOrganizationId(@PathVariable organizationId: Int): 
      Flux<Employee> = repository.findByOrganizationId(organizationId)

   @PostMapping // (4)
   fun add(@RequestBody employee: Employee) : Mono<Employee> = 
      repository.save(employee)

}

We also need to configure database connection settings in the Spring Boot application.yml:

spring:
  application:
    name: employee-service
  r2dbc:
    url: r2dbc:postgresql://localhost:5432/spring
    username: spring
    password: spring123

Here’s our main class. We want our app to create a table in the database on startup. With R2DBC we need to prepare a fragment of code for populating the schema with the schema.sql file.

@SpringBootApplication
class EmployeeApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

}

fun main(args: Array<String>) {
   runApplication<EmployeeApplication>(*args)
}

Then just place the schema.sql file in the src/main/resources directory.

CREATE TABLE employee (
  id SERIAL PRIMARY KEY, 
  name VARCHAR(255), 
  salary INT, 
  organization_id INT
);

Let’s switch to the organization-service. The implementation is pretty similar. Hore’s our domain model class:

class Organization(var name: String) {
    @Id var id: Int? = null
}

Our app is communicating with the employee-service. Therefore, we need to define the WebClient bean. It gets the address of the target service from application properties.

@SpringBootApplication
class OrganizationApplication {

   @Bean
   fun initializer(connectionFactory: ConnectionFactory): ConnectionFactoryInitializer? {
      val initializer = ConnectionFactoryInitializer()
      initializer.setConnectionFactory(connectionFactory)
      initializer.setDatabasePopulator(
         ResourceDatabasePopulator(ClassPathResource("schema.sql")))
      return initializer
   }

   @Value("\${employee.client.url}")
   private lateinit var employeeUrl: String

   @Bean
   fun webClient(builder: WebClient.Builder): WebClient {
      return builder.baseUrl(employeeUrl).build()
   }
}

fun main(args: Array<String>) {
    runApplication<OrganizationApplication>(*args)
}

There is also the repository interface OrganizationRepository. Our @RestController uses a repository bean to interact with the database and the WebClient bean to call the endpoint exposed by the employee-service. As the response from the GET /employees/{id}/with-employees it returns the OrganizationDTO.

@RestController
@RequestMapping("/organizations")
class OrganizationController {

   @Autowired
   lateinit var repository : OrganizationRepository
   @Autowired
   lateinit var client : WebClient

   @GetMapping
   fun findAll() : Flux<Organization> = repository.findAll()

   @GetMapping("/{id}")
   fun findById(@PathVariable id : Int): Mono<Organization> = 
      repository.findById(id)

   @GetMapping("/{id}/with-employees")
   fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
      val employees : Flux<Employee> = client.get().uri("/employees/organization/$id")
             .retrieve().bodyToFlux(Employee::class.java)
      val org : Mono<Organization> = repository.findById(id)
      return org.zipWith(employees.collectList()).log()
             .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

   @PostMapping
   fun add(@RequestBody employee: Organization) : Mono<Organization> = 
      repository.save(employee)

}

Here’s the implementation of our DTO:

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList<Employee> = ArrayList()
    constructor(employees: MutableList<Employee>) : this(null, "") {
        this.employees = employees
    }
    constructor(id: Int, name: String, employees: MutableList<Employee>) : this(id, name) {
        this.employees = employees
    }
}

Testing with Integrations

Once we finished the implementation we can prepare several integration tests. As I mentioned at the begging, we will use Testcontainers for running the Postgres container during the tests. Our test runs the app and leverages the auto-configured instance of WebTestClient to call the API endpoints (1). We need to start the Postgres container before the tests. So we need to define the container bean inside the companion object section (2). With the @ServiceConnection annotation we don’t have to manually set the properties – Spring Boot will do it for us (3).

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class EmployeeControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient // (1)

   companion object { // (2)

      @Container
      @ServiceConnection // (3)
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddEmployee() {
      webTestClient.post().uri("/employees")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Employee("Test", 1000, 1))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployee() {
      webTestClient.get().uri("/employees/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindEmployees() {
      webTestClient.get().uri("/employees")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

}

The test class for the organization-service is a little bit more complicated. That’s because we need to mock the communication with the employee-service. In order to do that we use the ClientAndServer object (1). It is started once before all the tests (2) and stopped after the tests (3). We are mocking the GET /employees/organization/{id} endpoint, which is invoked by the organization-service (4). Then we are calling the organization-service GET /organizations/{id}/with-employees endpoint (5). Finally, we are verifying if it returns the list of employees inside the JSON response.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@Testcontainers
@TestMethodOrder(OrderAnnotation::class)
public class OrganizationControllerTests {

   @Autowired
   private lateinit var webTestClient: WebTestClient

   companion object {

      @Container
      @ServiceConnection
      val container = PostgreSQLContainer<Nothing>("postgres:14").apply {
         withDatabaseName("spring")
         withUsername("spring")
         withPassword("spring123")
      }

      private var mockServer: ClientAndServer? = null // (1)

      @BeforeAll
      @JvmStatic
      internal fun beforeAll() { // (2)
         mockServer = ClientAndServer.startClientAndServer(8090);
      }

      @AfterAll
      @JvmStatic
      internal fun afterAll() { // (3)
         mockServer!!.stop()
      }
   }

   @Test
   @Order(1)
   fun shouldStart() {

   }

   @Test
   @Order(2)
   fun shouldAddOrganization() {
        webTestClient.post().uri("/organizations")
          .contentType(MediaType.APPLICATION_JSON)
          .bodyValue(Organization("Test"))
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganization() {
        webTestClient.get().uri("/organizations/1")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizations() {
        webTestClient.get().uri("/organizations")
          .accept(MediaType.APPLICATION_JSON)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody().jsonPath("$.length()").isEqualTo(1)
          .jsonPath("$[0].id").isNotEmpty
   }

   @Test
   @Order(3)
   fun shouldFindOrganizationWithEmployees() { // (4)
        mockServer!!.`when`(request()
               .withMethod("GET")
               .withPath("/employees/organization/1"))
            .respond(response()
                .withStatusCode(200)
                .withContentType(MediaType.APPLICATION_JSON)
                .withBody(createEmployees()))

      webTestClient.get().uri("/organizations/1/with-employees")
          .accept(MediaType.APPLICATION_JSON) // (5)
          .exchange()
          .expectStatus().is2xxSuccessful
          .expectBody()
          .jsonPath("$.id").isNotEmpty
          .jsonPath("$.employees.length()").isEqualTo(2)
          .jsonPath("$.employees[0].id").isEqualTo(1)
          .jsonPath("$.employees[1].id").isEqualTo(2)
   }

   private fun createEmployees(): String {
      val employees: List<Employee> = listOf<Employee>(
         Employee(1, "Test1", 10000, 1),
         Employee(2, "Test2", 20000, 1)
      )
      return jacksonObjectMapper().writeValueAsString(employees)
   }
}

You can easily verify that all the tests are finished successfully by running them on your laptop. After cloning the repository you need to run Docker and build the apps with the following Maven command:

$ mvn clean package

We can also prepare the build definition for our apps on CircleCI. Since we need to run Testcontainers, we need a machine with a Docker daemon. Here’s the configuration of a built pipeline for CircleCI inside the .circle/config.yml file:

version: 2.1

jobs:
  build:
    docker:
      - image: 'cimg/openjdk:20.0'
    steps:
      - checkout
      - run:
          name: Analyze on SonarCloud
          command: mvn verify sonar:sonar -DskipTests

executors:
  machine_executor_amd64:
    machine:
      image: ubuntu-2204:2022.04.2
    environment:
      architecture: "amd64"
      platform: "linux/amd64"

orbs:
  maven: circleci/maven@1.4.1

workflows:
  maven_test:
    jobs:
      - maven/test:
          executor: machine_executor_amd64
      - build:
          context: SonarCloud

Here’s the result of the build on CircleCI:

spring-boot-reactive-postgres-circleci

If you have Docker running you can also start our Spring Boot reactive apps with the Postgres container. It is possible thanks to the spring-boot-testcontainers module. There is a dedicated @TestConfiguration class that may be used to run Postgres in dev mode:

@TestConfiguration
class PostgresContainerDevMode {

    @Bean
    @ServiceConnection
    fun postgresql(): PostgreSQLContainer<*>? {
        return PostgreSQLContainer("postgres:14.0")
            .withUsername("spring")
            .withPassword("spring123")
    }
}

Now, we need to define the “test” main class that uses the configuration provided within the PostgresContainerDevMode class.

class EmployeeApplicationTest

fun main(args: Array<String>) {
    fromApplication<EmployeeApplication>()
       .with(PostgresContainerDevMode::class)
       .run(*args)
}

In order to run the app in dev Postgres on Docker just execute the following Maven command:

$ mvn spring-boot:test-run

The post Reactive Spring Boot with WebFlux, R2DBC and Postgres appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/07/28/reactive-spring-boot-with-webflux-r2dbc-and-postgres/feed/ 0 14359
Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/ https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/#comments Thu, 18 Oct 2018 07:42:10 +0000 https://piotrminkowski.wordpress.com/?p=6863 There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive […]

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
There are pretty many technologies listed in the title of this article. Spring WebFlux has been introduced with Spring 5 and Spring Boot 2 as a project for building reactive-stack web applications. I have already described how to use it together with Spring Boot and Spring Cloud for building reactive microservices in that article: Reactive Microservices with Spring WebFlux and Spring Cloud. Spring 5 has also introduced some projects supporting reactive access to NoSQL databases like Cassandra, MongoDB or Couchbase. But there were still a lack in support for reactive to access to relational databases. The change is coming together with R2DBC (Reactive Relational Database Connectivity) project. That project is also being developed by Pivotal members. It seems to be very interesting initiative, however it is rather at the beginning of the road. Anyway, there is a module for integration with Postgres, and we will use it for our demo application. R2DBC will not be the only one new interesting solution described in this article. I also show you how to use Spring Data JDBC – another really interesting project released recently.
It is worth mentioning some words about Spring Data JDBC. This project has been already released, and is available under version 1.0. It is a part of a bigger Spring Data framework. It offers a repository abstraction based on JDBC. The main reason for creating that library is to allow access to relational databases using Spring Data way (through CrudRepository interfaces) without including JPA library to the application dependencies. Of course, JPA is still certainly the main persistence API used for Java applications. Spring Data JDBC aims to be much simpler conceptually than JPA by not implementing popular patterns like lazy loading, caching, dirty context, sessions. It also provides only very limited support for annotation-based mapping. Finally, it provides an implementation of reactive repositories that uses R2DBC for accessing a relational database. Although that module is still under development (only SNAPSHOT version is available), we will try to use it in our demo application. Let’s proceed to the implementation.

Including dependencies

We use Kotlin for implementation. So first, we include some required Kotlin dependencies.

<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-stdlib</artifactId>
   <version>${kotlin.version}</version>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.module</groupId>
   <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-test-junit</artifactId>
   <version>${kotlin.version}</version>
   <scope>test</scope>
</dependency>

We should also add kotlin-maven-plugin with support for Spring.


<plugin>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-maven-plugin</artifactId>
   <version>${kotlin.version}</version>
   <executions>
      <execution>
         <id>compile</id>
         <phase>compile</phase>
         <goals>
            <goal>compile</goal>
         </goals>
      </execution>
      <execution>
         <id>test-compile</id>
         <phase>test-compile</phase>
         <goals>
            <goal>test-compile</goal>
         </goals>
      </execution>
   </executions>
   <configuration>
      <args>
         <arg>-Xjsr305=strict</arg>
      </args>
      <compilerPlugins>
         <plugin>spring</plugin>
      </compilerPlugins>
   </configuration>
</plugin>

Then, we may proceed to including frameworks required for the demo implementation. We need to include the special SNAPSHOT version of Spring Data JDBC dedicated for accessing databases using R2DBC. We also have to add some R2DBC libraries and Spring WebFlux. As you may see below only Spring WebFlux is available in stable version (as a part of Spring Boot RELEASE).

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-webflux</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.data</groupId>
   <artifactId>spring-data-jdbc</artifactId>
   <version>1.0.0.r2dbc-SNAPSHOT</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-spi</artifactId>
   <version>1.0.0.M5</version>
</dependency>
<dependency>
   <groupId>io.r2dbc</groupId>
   <artifactId>r2dbc-postgresql</artifactId>
   <version>1.0.0.M5</version>
</dependency>

It is also important to set dependency management for Spring Data project.


<dependencyManagement>
   <dependencies>
      <dependency>
         <groupId>org.springframework.data</groupId>
         <artifactId>spring-data-releasetrain</artifactId>
         <version>Lovelace-RELEASE</version>
         <scope>import</scope>
         <type>pom</type>
      </dependency>
   </dependencies>
</dependencyManagement>

Repositories

We are using the well known Spring Data style of CRUD repository implementation. In that case we need to create an interface that extends ReactiveCrudRepository interface.
Here’s the implementation of a repository for managing Employee objects.

interface EmployeeRepository : ReactiveCrudRepository<Employee, Int> {
    @Query("select id, name, salary, organization_id from employee e where e.organization_id = $1")
    fun findByOrganizationId(organizationId: Int) : Flux<Employee>
}

Here’s another implementation of repository – this time for managing Organization objects.

interface OrganizationRepository : ReactiveCrudRepository<Organization, Int> 

Implementing Entities and DTOs

Kotlin provides a convenient way of creating entity class by declaring them as data class. When using Spring Data JDBC we have to set the primary key for the entity by annotating the field with @Id. It assumes the key is automatically incremented by the database. If you are not using auto-increment columns, you have to use a BeforeSaveEvent listener, which sets the ID of the entity. However, I tried to set such a listener for my entity, but it just didn’t work with the reactive version of Spring Data JDBC.
Here’s an implementation of Employee entity class. What is worth mentioning Spring Data JDBC will automatically map class field organizationId into database column organization_id.

data class Employee(val name: String, val salary: Int, val organizationId: Int) {
    @Id 
    var id: Int? = null
}

Here’s an implementation of Organization entity class.

data class Organization(var name: String) {
    @Id 
    var id: Int? = null
}

R2DBC does not support any lists or sets. Because I’d like to return a list with employees inside Organization object in one of API endpoints I have created a DTO containing such a list as shown below.

data class OrganizationDTO(var id: Int?, var name: String) {
    var employees : MutableList = ArrayList()
    constructor(employees: MutableList) : this(null, "") {
        this.employees = employees
    }
}

The SQL scripts corresponding to the created entities are visible below. Field type serial will automatically create a sequence and attach it to the field id.

CREATE TABLE employee (
    name character varying NOT NULL,
    salary integer NOT NULL,
    id serial PRIMARY KEY,
    organization_id integer
);
CREATE TABLE organization (
    name character varying NOT NULL,
    id serial PRIMARY KEY
);

Building sample web applications

For the demo purposes we will build two independent applications employee-service and organization-service. Application organization-service is communicating with employee-service using WebFlux WebClient. It gets the list of employees assigned to the organization, and includes them to response together with Organization object. Sample applications source code is available on GitHub under repository sample-spring-data-webflux: https://github.com/piomin/sample-spring-data-webflux.
Ok, let’s begin from declaring Spring Boot main class. We need to enable Spring Data JDBC repositories by annotating the main class with @EnableJdbcRepositories.

@SpringBootApplication
@EnableJdbcRepositories
class EmployeeApplication

fun main(args: Array<String>) {
    runApplication<EmployeeApplication>(*args)
}

Working with R2DBC and Postgres requires some configuration. Probably due to an early stage of progress in development of Spring Data JDBC and R2DBC there is no Spring Boot auto-configuration for Postgres. We need to declare connection factory, client, and repository inside @Configuration bean.

@Configuration
class EmployeeConfiguration {

    @Bean
    fun repository(factory: R2dbcRepositoryFactory): EmployeeRepository {
        return factory.getRepository(EmployeeRepository::class.java)
    }

    @Bean
    fun factory(client: DatabaseClient): R2dbcRepositoryFactory {
        val context = RelationalMappingContext()
        context.afterPropertiesSet()
        return R2dbcRepositoryFactory(client, context)
    }

    @Bean
    fun databaseClient(factory: ConnectionFactory): DatabaseClient {
        return DatabaseClient.builder().connectionFactory(factory).build()
    }

    @Bean
    fun connectionFactory(): PostgresqlConnectionFactory {
        val config = PostgresqlConnectionConfiguration.builder() //
                .host("192.168.99.100") //
                .port(5432) //
                .database("reactive") //
                .username("reactive") //
                .password("reactive123") //
                .build()

        return PostgresqlConnectionFactory(config)
    }

}

Finally, we can create REST controllers that contain the definition of our reactive API methods. With Kotlin it does not take much space. The following controller definition contains three GET methods that allows to find all employees, all employees assigned to a given organization or a single employee by id.

@RestController
@RequestMapping("/employees")
class EmployeeController {

    @Autowired
    lateinit var repository : EmployeeRepository

    @GetMapping
    fun findAll() : Flux<Employee> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Employee> = repository.findById(id)

    @GetMapping("/organization/{organizationId}")
    fun findByorganizationId(@PathVariable organizationId : Int) : Flux<Employee> = repository.findByOrganizationId(organizationId)

    @PostMapping
    fun add(@RequestBody employee: Employee) : Mono<Employee> = repository.save(employee)

}

Inter-service Communication

For the OrganizationController the implementation is a little bit more complicated. Because organization-service is communicating with employee-service, we first need to declare reactive WebFlux WebClient builder.

@Bean
fun clientBuilder() : WebClient.Builder {
   return WebClient.builder()
}

Then, similar to the repository bean the builder is being injected into the controller. It is used inside findByIdWithEmployees method for calling method GET /employees/organization/{organizationId} exposed by employee-service. As you can see on the code fragment below it provides a reactive API and returns Flux object containing a list of found employees. This list is injected into OrganizationDTO object using zipWith Reactor method.

@RestController
@RequestMapping("/organizations")
class OrganizationController {

    @Autowired
    lateinit var repository : OrganizationRepository
    @Autowired
    lateinit var clientBuilder : WebClient.Builder

    @GetMapping
    fun findAll() : Flux<Organization> = repository.findAll()

    @GetMapping("/{id}")
    fun findById(@PathVariable id : Int) : Mono<Organization> = repository.findById(id)

    @GetMapping("/{id}/withEmployees")
    fun findByIdWithEmployees(@PathVariable id : Int) : Mono<OrganizationDTO> {
        val employees : Flux<Employee> = clientBuilder.build().get().uri("http://localhost:8090/employees/organization/$id")
                .retrieve().bodyToFlux(Employee::class.java)
        val org : Mono = repository.findById(id)
        return org.zipWith(employees.collectList())
                .map { tuple -> OrganizationDTO(tuple.t1.id as Int, tuple.t1.name, tuple.t2) }
    }

    @PostMapping
    fun add(@RequestBody employee: Organization) : Mono<Organization> = repository.save(employee)

}

How it works?

Before running the tests we need to start a Postgres database. Here’s the Docker command used for running a Postgres container. It is creating a user with a password, and setting up a default database.

$ docker run -d --name postgres -p 5432:5432 -e POSTGRES_USER=reactive -e POSTGRES_PASSWORD=reactive123 -e POSTGRES_DB=reactive postgres

Then we need to create some test tables, so you have to run a SQL script placed in the section Implementing Entities and DTOs. After that you can start our test applications. If you do not override default settings provided inside application.yml files employee-service is listening on port 8090, and organization-service on port 8095. The following picture illustrates the architecture of our sample system.
spring-data-1
Now, let’s add some test data using the reactive API exposed by the applications.

$ curl -d '{"name":"Test1"}' -H "Content-Type: application/json" -X POST http://localhost:8095/organizations
$ curl -d '{"name":"Name1", "balance":5000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees
$ curl -d '{"name":"Name2", "balance":10000, "organizationId":1}' -H "Content-Type: application/json" -X POST http://localhost:8090/employees

Finally you can call GET organizations/{id}/withEmployees method, for example using your web browser. The result should be similar to the result visible on the following picture.

spring-data-2

The post Introduction to Reactive APIs with Postgres, R2DBC, Spring Data JDBC and Spring WebFlux appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2018/10/18/introduction-to-reactive-apis-with-postgres-r2dbc-spring-data-jdbc-and-spring-webflux/feed/ 3 6863