Spring Data Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-data/ Java, Spring, Kotlin, microservices, Kubernetes, containers Thu, 07 Dec 2023 14:19:15 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 Spring Data Archives - Piotr's TechBlog https://piotrminkowski.com/tag/spring-data/ 32 32 181738725 Getting Started with Spring Cloud Azure https://piotrminkowski.com/2023/12/07/getting-started-with-spring-cloud-azure/ https://piotrminkowski.com/2023/12/07/getting-started-with-spring-cloud-azure/#respond Thu, 07 Dec 2023 14:19:12 +0000 https://piotrminkowski.com/?p=14725 This article will teach you how to use Spring Cloud to simplify integration between Spring Boot apps and Azure services. We will also see how to leverage the Azure Spring Apps service to deploy, run, and manage our app on Azure. Our sample Spring Boot app stores data in the Azure Cosmos DB service and […]

The post Getting Started with Spring Cloud Azure appeared first on Piotr's TechBlog.

]]>
This article will teach you how to use Spring Cloud to simplify integration between Spring Boot apps and Azure services. We will also see how to leverage the Azure Spring Apps service to deploy, run, and manage our app on Azure. Our sample Spring Boot app stores data in the Azure Cosmos DB service and exposes some REST endpoints under the public URL. We can run it locally and connect remote services or deploy it on the cloud and connect those services internally under the same virtual network.

If you need an introduction to Spring Cloud read my article about microservices with Spring Boot 3 and Spring Cloud available here. It is worth at least taking a look at the Spring Cloud Azure docs for a basic understanding of the main concepts.

Architecture

Our architecture is pretty simple. As I mentioned before, we have a single Spring Boot app (account-service in the diagram) that runs on Azure and connects to Cosmos DB. It exposes some REST endpoints for adding, deleting, or searching accounts backed by Cosmos DB. It also stores the whole required configuration (like Cosmos DB address and credentials) in the Azure App Configuration service. The app is managed by the Azure Spring Apps service. Here’s the diagram illustrating our architecture.

spring-cloud-azure-arch

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. The Spring Boot app used in the article is located in the microservices/account-service directory. After you go to that directory you should just follow my further instructions.

There are some prerequisites before you start the exercise. You need to install JDK17+ and Maven on your local machine. You also need to have an account on Azure and az CLI to interact with that account. In order to deploy the app on Azure we will use azure-spring-apps-maven-plugin, which requires az CLI.

Dependencies

Firstly, let’s take a look at the list of required Maven dependencies. Of course, we need to add the Spring Boot Web starter to enable REST support through the Spring MVC module. In order to integrate with Cosmos DB, we will use the Spring Data repositories. Spring Cloud Azure provides a dedicated starter spring-cloud-azure-starter-data-cosmos for it. The spring-cloud-azure-starter-actuator module is optional. It will enable a health indicator for Cosmos DB in the /actuator/health endpoint. After that, we will include the starter providing integration with the Azure App Configuration service. Finally, we can add the Springdoc OpenAPI project responsible for generating REST API documentation.

<dependencies>
  <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
  </dependency>
  <dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-actuator</artifactId>
  </dependency>
  <dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-data-cosmos</artifactId>
  </dependency>
  <dependency>
    <groupId>com.azure.spring</groupId>
    <artifactId>spring-cloud-azure-starter-appconfiguration-config</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springdoc</groupId>
    <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
    <version>2.2.0</version>
  </dependency>
</dependencies>

Spring Cloud with Azure Cosmos DB

After including the Spring Data module with Cosmos DB support we may define the model class. The Account class contains three String fields: id (primary key), number, and customerId (partition key). The partition key is responsible for dividing data into distinct subsets calledΒ logical partitions. The model must be annotated with @Container. The containerName parameter inside the annotation corresponds to the name of the Cosmos DB container created in Azure.

@Container(containerName = "accounts")
public class Account {
   @Id
   @GeneratedValue
   private String id;
   private String number;
   @PartitionKey
   private String customerId;

   // GETTERS AND SETTERS ...
}

Now, let’s prepare our environment in Azure. After logging in with the az login CLI command we create the resource group for our exercise. The name of the group is sample-spring-cloud. The location depends on your preferences. For me it is eastus.

$ az group create -l eastus -n sample-spring-cloud

Then, we are going to create a new Azure Cosmos DB database account. The name of my account is sample-pminkows-cosmosdb. It is placed inside our sample-spring-cloud resource group. I’ll leave the default values in all other parameters. But you can consider overriding some parameters to decrease the instance cost. For example, we can set the Local backup redundancy type using the --backup-redundancy parameter.

$ az cosmosdb create -n sample-pminkows-cosmosdb -g sample-spring-cloud

Once we enable a database account we can create a database instance. The name of our database is sampled. Of course, it has to be placed in the previously created sample-pminkows-cosmosdb Cosmos DB account.

$ az cosmosdb sql database create \
    -a sample-pminkows-cosmosdb \
    -n sampledb \
    -g sample-spring-cloud

Finally, we need to create a container inside our database. The name of the container should be the same as the value of the containerName field declared in the model class. We also have to set the partition key path. As you probably remember, we are using the customerId field in the Account class for that.

$ az cosmosdb sql container create \
    -a sample-pminkows-cosmosdb \
    -g sample-spring-cloud \
    -n accounts \
    -d sampledb \
    -p /customerId

Everything is ready on the Azure side. Let’s back for a moment to the source code. In order to interact with the database, we will create the Spring Data repository interface. It has to extend the CosmosRepository interface provided within Spring Cloud Azure. It defines one additional method for searching by the customerId field.

public interface AccountRepository extends CosmosRepository<Account, String> {
   List<Account> findByCustomerId(String customerId);
}

Finally, we can create @RestController with the endpoints implementation. It injects and uses the AccountRepository bean.

@RestController
@RequestMapping("/accounts")
public class AccountController {

   private final static Logger LOG = LoggerFactory
      .getLogger(AccountController.class);
   private final AccountRepository repository;

   public AccountController(AccountRepository repository) {
      this.repository = repository;
   }

   @PostMapping
   public Account add(@RequestBody Account account) {
      LOG.info("add: {}", account.getNumber());
      return repository.save(account);
   }

   @GetMapping("/{id}")
   public Account findById(@PathVariable String id) {
      LOG.info("findById: {}", id);
      return repository.findById(id).orElseThrow();
   }

   @GetMapping
   public List<Account> findAll() {
      List<Account> accounts = new ArrayList<>();
      repository.findAll().forEach(accounts::add);
      return accounts;
   }

   @GetMapping("/customer/{customerId}")
   public List<Account> findByCustomerId(@PathVariable String customerId) {
      LOG.info("findByCustomerId: {}", customerId);
      return repository.findByCustomerId(customerId);
   }
}

Azure App Configuration with Spring Cloud

Once we finish the app implementation, we can run it and connect with Cosmos DB. Of course, we need to set the connection URL and credentials. Let’s switch to the Azure Portal. We need to find the “Azure Cosmos DB” service in the main menu. Then click your database account. You will see the address of the endpoint as shown below. You should also see the previously created container in the “Containers” section.

In order to obtain the connection key, we need to go to the “Data Explorer” item in the left-side menu. Then choose the “Connect” tile. You will find the key in the target window.

spring-cloud-azure-cosmosdb

We could easily set all the required connection parameters using the spring.cloud.azure.cosmos.* properties. However, I would like to store all the configuration settings on Azure. Spring Cloud comes with built-in support for Azure App Configuration service. We have already included the required Spring Cloud starter. So now, we need to enable the Azure App Configuration service and put our properties into the store. Here’s the command for creating an App Configuration under the sample-spring-cloud-config name:

$ az appconfig create \
    -g sample-spring-cloud \
    -n sample-spring-cloud-config \
    -l eastus \
    --sku Standard

Once we create the App Configuration we can put our configuration settings in the key/value form. By default, Spring Cloud Azure is loading configurations that start with the keyΒ /application/. We need to add three Spring Cloud properties: spring.cloud.azure.cosmos.key, spring.cloud.azure.cosmos.database, and spring.cloud.azure.cosmos.endpoint.

$ az appconfig kv set \
    -n sample-spring-cloud-config \
    --key /application/spring.cloud.azure.cosmos.key \
    --value <YOUR_PRIMARY_KEY>

$ az appconfig kv set \
    -n sample-spring-cloud-config \
    --key /application/spring.cloud.azure.cosmos.database \
    --value sampledb

$ az appconfig kv set \
    -n sample-spring-cloud-config \
    --key /application/spring.cloud.azure.cosmos.endpoint \
    --value <YOUR_ENDPOINT_URI>

Let’s switch to the Azure Portal to check the configuration settings. We need to find the “App Configuration” service in the main dashboard. Then go to the sample-spring-cloud-config details and choose the “Configuration explorer” menu item. You should have all your application properties prefixed by the /application/. I also overrode some Spring Actuator settings to enable health check details and additional management endpoints.

spring-cloud-azure-app-configuration

That’s all. Now, we are ready to run our app. We just need to connect it to the Azure App Configuration service. In order to do that, we need to obtain its connection endpoint and credentials. You can go to the “Access keys” menu item in the “Settings” section. Then you should copy the value from the “Connection string” field as shown below. Alternatively, you can obtain the same information by executing the following CLI command: az appconfig credential list --name sample-spring-cloud-config.

Let’s save the value inside the APP_CONFIGURATION_CONNECTION_STRING environment variable. After that, we just need to create the Spring bootstrap.properties file in the src/main/resources directory containing the spring.cloud.azure.appconfiguration.stores[0].connection-string property.

spring.cloud.azure.appconfiguration.stores[0].connection-string=${APP_CONFIGURATION_CONNECTION_STRING}

Running Spring Boot App Locally

Finally, we can run our sample Spring Boot app. For now, we will just run it locally. As a result, it will connect to the Azure App Configuration and Cosmos DB deployed on the cloud. We can execute the following Maven command to start the app:

$ mvn clean spring-boot:run

Once you start the app you should see that it loads property sources from the Azure store:

If everything works fine your app is loading settings from Azure App Configuration and connects to the Cosmos DB instance:

spring-cloud-azure-logs

Once you start the app you can access it under the 8080 local port. The Swagger UI is available under the /swagger-ui.html path:

spring-cloud-azure-swagger

We can some data using e.g. the curl command as shown below:

$ curl -X 'POST' 'http://localhost:8080/accounts' \
    -H 'Content-Type: application/json' \
    -d '{"number": "1234567893","customerId": "1"}'
{"id":"5301e9dd-0556-40b7-9ea3-96975492f00c","number":"1234567893","customerId":"1"}

Then, we can e.g. find accounts owned by a particular customer:

$ curl http://localhost:8080/accounts/customer/1

We can also delete an existing account by calling the DELETE /account/{id} endpoint. In that case, I received the HTTP 404 Not Found error. Interesting?

Let’s see what happened. If you take a look at the implementation of AccountController you will find the method for the DELETE endpoint, right? In the meantime, I added one method annotated with @FeatureGate. This annotation is provided by Spring Cloud Azure. The following fragment of code shows the usage of feature management with Azure App Configuration. In fact, I’m using the “Feature Gate” functionality, which allows us to call the endpoint only if a feature is enabled on the Azure side. The name of our feature is delete-account.

@DeleteMapping("/{id}")
@FeatureGate(feature = "delete-account")
public void deleteById(@PathVariable String id) {
   repository.deleteById(id);
}

Now, the only thing we need to do is to add a new feature to the sample-spring-cloud-config App Configuration.

$ az appconfig feature set -n sample-spring-cloud-config --feature test-2

Let’s switch to the Azure Portal. You should go to the “Feature manager” menu item in the “Operations” section. As you see, by default the feature flag is disabled. It means the feature is not active and the endpoint is disabled.

spring-cloud-azure-feature

You can enable the feature by clicking the checkbox button and then restart the app. After that, the DELETE endpoint should be available.

Deploy Spring Cloud App on Azure

We can deploy our sample app to Azure in several different ways. I’ll choose the service dedicated especially to Spring Boot – Azure Spring Apps.

The installation from Azure Portal is pretty straightforward. I won’t get into the details. The name of our instance (cluster) is sample-spring-cloud-apps. We don’t need to know anything more to be able to deploy our app there.

Azure provides several Maven plugins for deploying apps. For Azure Spring Apps we should use azure-spring-apps-maven-plugin. We need to set the Azure Spring Apps instance in the clusterName parameter. The name of our app is account-service. We should also choose SKU and set the Azure subscription ID (loaded from the SUBSCRIPTION environment variable). In the deployment section, we need to define the required resources (RAM and CPU), number of running instances, Java version, and a single environment variable containing the connection string to the Azure App Configuration instance.

<plugin>
  <groupId>com.microsoft.azure</groupId>
  <artifactId>azure-spring-apps-maven-plugin</artifactId>
  <version>1.19.0</version>
  <configuration>
    <subscriptionId>${env.SUBSCRIPTION}</subscriptionId>
    <resourceGroup>sample-spring-cloud</resourceGroup>
    <clusterName>sample-spring-cloud-apps</clusterName>
    <sku>Consumption</sku>
    <appName>account-service</appName>
    <isPublic>true</isPublic>
    <deployment>
      <cpu>0.5</cpu>
      <memoryInGB>1</memoryInGB>
      <instanceCount>1</instanceCount>
      <runtimeVersion>Java 17</runtimeVersion>
      <environment>
        <APP_CONFIGURATION_CONNECTION_STRING>
          ${env.APP_CONFIGURATION_CONNECTION_STRING}
        </APP_CONFIGURATION_CONNECTION_STRING>
      </environment>
      <resources>
        <resource>
          <directory>target/</directory>
          <includes>
            <include>*.jar</include>
          </includes>
        </resource>
      </resources>
    </deployment>
  </configuration>
</plugin>

Then we need to build the app and deploy it on Azure Spring Apps with the following command:

$ mvn clean package azure-spring-apps:deploy

You should have a similar result as shown below:

Does the name of the instance sound familiar? πŸ™‚ Under the hood it’s Kubernetes. The Azure Spring Apps service uses Azure Container Apps for running containers. On the other hand, Azure Container Apps is hosted on the Kubernetes cluster. But these are the details. What is important here – our app has already been deployed on Azure.

spring-cloud-azure-spring-apps

We can display the account-service app details. The app is exposed under the public URL. We just need to copy the link.

Let’s take a look at the configuration section. As you see it contains the connection string to the App Configuration endpoint.

We can display the Swagger UI and perform some test calls.

Final Thoughts

That’s all in this article, but I’m planning to create several others about Spring Boot and Azure soon! Azure seems to be a friendly platform for the Spring Boot developer πŸ™‚ I showed you how to easily integrate your Spring Boot app with the most popular Azure services like Cosmos DB. We also covered such topics as configuration management and feature flags (gates) with the App Configuration service. Finally, we deployed the app on… Kubernetes through the Azure Spring Apps service πŸ™‚

The post Getting Started with Spring Cloud Azure appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/12/07/getting-started-with-spring-cloud-azure/feed/ 0 14725
Guide to Modulith with Spring Boot https://piotrminkowski.com/2023/10/13/guide-to-modulith-with-spring-boot/ https://piotrminkowski.com/2023/10/13/guide-to-modulith-with-spring-boot/#comments Fri, 13 Oct 2023 14:05:33 +0000 https://piotrminkowski.com/?p=14587 This article will teach you how to build modulith with Spring Boot and use the Spring Modulith project features. Modulith is a software architecture pattern that assumes organizing your monolith app into logical modules. Such modules should be independent of each other as much as possible. Modulith balances monolithic and microservices-based architectures. It can be your […]

The post Guide to Modulith with Spring Boot appeared first on Piotr's TechBlog.

]]>
This article will teach you how to build modulith with Spring Boot and use the Spring Modulith project features. Modulith is a software architecture pattern that assumes organizing your monolith app into logical modules. Such modules should be independent of each other as much as possible. Modulith balances monolithic and microservices-based architectures. It can be your target model for organizing the app. But you can also treat it just as a transitional phase during migration from the monolithic into a microservices-based approach. Spring Modulith will help us build a well-structured Spring Boot app and verify dependencies between the logical modules.

We will compare the current approach with the microservices-based architecture. In order to do that, we implement very similar functionality as described in my latest article about building microservices with Spring Cloud and Spring Boot 3.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

Before we start, let’s take a look at the following diagram. It illustrates the architecture of our sample system. We have three independent modules, which communicate with each other: employee, department, and organization. There is also the gateway module. It is responsible for exposing internal services as the REST endpoints outside of the app. Our modules send traces to the Zipkin instance using the support provided within the Spring Modulith project.

spring-boot-modulith-arch

If you want to compare it with the similar microservices architecture described in the previously mentioned article here’s the diagram.

Let’s take a look at the structure of our code. By default, each direct sub-package of the main package is considered an application module package. So there are four application modules: department, employee, gateway, and organization. Each module contains “provided interfaces” exposed to the other modules. We need to place them in the application module root directory. Other modules cannot access any classes or beans from application module sub-packages. We will discuss it in detail in the next sections.

src/main/java
└── pl
    └── piomin
        └── services
            β”œβ”€β”€ OrganizationAddEvent.java
            β”œβ”€β”€ OrganizationRemoveEvent.java
            β”œβ”€β”€ SpringModulith.java
            β”œβ”€β”€ department
            β”‚   β”œβ”€β”€ DepartmentDTO.java
            β”‚   β”œβ”€β”€ DepartmentExternalAPI.java
            β”‚   β”œβ”€β”€ DepartmentInternalAPI.java
            β”‚   β”œβ”€β”€ management
            β”‚   β”‚   β”œβ”€β”€ DepartmentManagement.java
            β”‚   β”‚   └── package-info.java
            β”‚   β”œβ”€β”€ mapper
            β”‚   β”‚   └── DepartmentMapper.java
            β”‚   β”œβ”€β”€ model
            β”‚   β”‚   └── Department.java
            β”‚   └── repository
            β”‚       └── DepartmentRepository.java
            β”œβ”€β”€ employee
            β”‚   β”œβ”€β”€ EmployeeDTO.java
            β”‚   β”œβ”€β”€ EmployeeExternalAPI.java
            β”‚   β”œβ”€β”€ EmployeeInternalAPI.java
            β”‚   β”œβ”€β”€ management
            β”‚   β”‚   └── EmployeeManagement.java
            β”‚   β”œβ”€β”€ mapper
            β”‚   β”‚   └── EmployeeMapper.java
            β”‚   β”œβ”€β”€ model
            β”‚   β”‚   └── Employee.java
            β”‚   └── repository
            β”‚       └── EmployeeRepository.java
            β”œβ”€β”€ gateway
            β”‚   └── GatewayManagement.java
            └── organization
                β”œβ”€β”€ OrganizationDTO.java
                β”œβ”€β”€ OrganizationExternalAPI.java
                β”œβ”€β”€ management
                β”‚   └── OrganizationManagement.java
                β”œβ”€β”€ mapper
                β”‚   └── OrganizationMapper.java
                β”œβ”€β”€ model
                β”‚   └── Organization.java
                └── repository
                    └── OrganizationRepository.java

Dependencies

Let’s take a look at a list of required dependencies. Our app exposes some REST endpoints and connects to the embedded H2 database. So, we need to include Spring Web and Spring Data JPA projects. In order to use Spring Modulith, we have to add the spring-modulith-starter-core starter. We will also do some mappings between entities and DTO classes. Therefore we will include the mapstruct project that simplifies mappings between Java beans.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-starter-core</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.modulith</groupId>
  <artifactId>spring-modulith-starter-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>org.mapstruct</groupId>
  <artifactId>mapstruct</artifactId>
  <version>1.5.5.Final</version>
</dependency>

The Structure of Application Modules

We will analyze the structure of our modules on the example of the employee module. All the interfaces/classes in the module root directory can be called from other modules (green color). Other modules cannot call any interfaces/classes from module sub-packages (red color).

spring-boot-modulith-code-structure

The app implementation is not complicated. Here’s our Employee entity class:

@Entity
public class Employee {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private Long organizationId;
   private Long departmentId;
   private String name;
   private int age;
   private String position;

   // ... GETTERS/SETTERS
}

We are using the Spring Data JPA Repository pattern to interact with the H2 database. Instead of the entity classes, we are returning the DTO objects using the Spring Data projection feature.

public interface EmployeeRepository extends CrudRepository<Employee, Long> {
   List<EmployeeDTO> findByDepartmentId(Long departmentId);
   List<EmployeeDTO> findByOrganizationId(Long organizationId);
   void deleteByOrganizationId(Long organizationId);
}

Here’s our DTO record. It is exposed outside the module because other modules will have to access Employee data. We don’t want to expose entity class directly, so DTO is a very useful pattern here.

public record EmployeeDTO(Long id,
                          Long organizationId,
                          Long departmentId,
                          String name,
                          int age,
                          String position) {
}

Let’s also define a mapper between entity and DTO using the mapstruct support.

@Mapper(componentModel = MappingConstants.ComponentModel.SPRING)
public interface EmployeeMapper {
    EmployeeDTO employeeToEmployeeDTO(Employee employee);
    Employee employeeDTOToEmployee(EmployeeDTO employeeDTO);
}

We want to hide the implementation details of the main module @Service behind other modules. Therefore we will expose the required methods via the interface. Other modules will use the interface to call the @Service methods. The InternalAPI suffix means that this interface is just for internal usage between the modules.

public interface EmployeeInternalAPI {

   List<EmployeeDTO> getEmployeesByDepartmentId(Long id);
   List<EmployeeDTO> getEmployeesByOrganizationId(Long id);

}

In order to expose some @Service methods outside the app as the REST endpoints we will use the ExternalAPI suffix in the interface name. For the employee module, we only expose the method for adding new employees.

public interface EmployeeExternalAPI {
   EmployeeDTO add(EmployeeDTO employee);
}

Our management @Service implements both external and internal interfaces. It injects and uses the repository and mapper beans. here are two internal methods used by the department and organization modules (1), a single external method exposed as the REST endpoint (2), and the method for processing asynchronous events from other modules (3). We will discuss the last one of the methods later.

@Service
public class EmployeeManagement implements EmployeeInternalAPI, 
                                           EmployeeExternalAPI {

   private static final Logger LOG = LoggerFactory
      .getLogger(EmployeeManagement.class);
   private EmployeeRepository repository;
   private EmployeeMapper mapper;

   public EmployeeManagement(EmployeeRepository repository,
                             EmployeeMapper mapper) {
      this.repository = repository;
      this.mapper = mapper;
   }

   @Override // (1)
   public List<EmployeeDTO> getEmployeesByDepartmentId(Long departmentId) {
      return repository.findByDepartmentId(departmentId);
   }

   @Override // (1)
   public List<EmployeeDTO> getEmployeesByOrganizationId(Long id) {
      return repository.findByOrganizationId(id);
   }

   @Override
   @Transactional // (2)
   public EmployeeDTO add(EmployeeDTO employee) {
      Employee emp = mapper.employeeDTOToEmployee(employee);
      return mapper.employeeToEmployeeDTO(repository.save(emp));
   }

   @ApplicationModuleListener // (3)
   void onRemovedOrganizationEvent(OrganizationRemoveEvent event) {
      LOG.info("onRemovedOrganizationEvent(orgId={})", event.getId());
      repository.deleteByOrganizationId(event.getId());
   }

}

Verify Dependencies with Spring Modulith

Let’s switch to the department module. It needs to access data exposed by the employee module. In order to do that, it will use the methods provided within the EmployeeInternalAPI interface. The implementation in the form of the EmployeeManagement class should be hidden from the department module. However, let’s imagine that the department module calls the EmployeeManagement bean directly. Here’s the fragment of the DepartmentManagement implementation:

@Service
public class DepartmentManagement {

   private DepartmentRepository repository;
   private EmployeeManagement employeeManagement;
   private DepartmentMapper mapper;

   public DepartmentManagement(DepartmentRepository repository,
                               EmployeeManagement employeeManagement,
                               DepartmentMapper mapper) {
      this.repository = repository;
      this.employeeManagement = employeeManagement;
      this.mapper = mapper;
   }

   public DepartmentDTO getDepartmentByIdWithEmployees(Long id) {
      DepartmentDTO d = repository.findDTOById(id);
      List<EmployeeDTO> dtos = employeeManagement
         .getEmployeesByDepartmentId(id);
      d.employees().addAll(dtos);
      return d;
   }
}

Here comes the Spring Modulith project. For example, we can create the JUnit test to verify the dependencies between app modules. Once we break the modulith rules our test will fail. Let’s see how can leverage the verify() method provided by the Spring Modulith ApplicationModules class:

public class SpringModulithTests {

   ApplicationModules modules = ApplicationModules.of(SpringModulith.class);

   @Test
   void shouldBeCompliant() {
      modules.verify();
   }
}

Here’s the verification result for the previously introduced implementation of DepartmentManagement bean:

Now, let’s do it in the proper way. First of all, the DepartmentDTO record uses the EmployeeDTO. This relation is represented only at the DTO level.

public record DepartmentDTO(Long id,
                            Long organizationId,
                            String name,
                            List<EmployeeDTO> employees) {
    public DepartmentDTO(Long id, Long organizationId, String name) {
        this(id, organizationId, name, new ArrayList<>());
    }
}

There are no relations between the entities and corresponding database tables. We want our modules to be independent at the database level. Although there are no relations between the tables, we still use a single database. Here’s the DepartmentEntity class:

@Entity
public class Department {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private Long organizationId;
   private String name;

   // ... GETTERS/SETTERS
}

The same as before, there is a mapper to convert between the entity and DTO:

@Mapper(componentModel = MappingConstants.ComponentModel.SPRING)
public interface DepartmentMapper {
   DepartmentDTO departmentToEmployeeDTO(Department department);
   Department departmentDTOToEmployee(DepartmentDTO departmentDTO);
}

Here’s the repository interface:

public interface DepartmentRepository extends CrudRepository<Department, Long> {

   @Query("""
          SELECT new pl.piomin.services.department.DepartmentDTO(d.id, d.organizationId, d.name)
          FROM Department d
          WHERE d.id = :id
          """)
   DepartmentDTO findDTOById(Long id);

   @Query("""
          SELECT new pl.piomin.services.department.DepartmentDTO(d.id, d.organizationId, d.name)
          FROM Department d
          WHERE d.organizationId = :organizationId
          """)
   List<DepartmentDTO> findByOrganizationId(Long organizationId);

   void deleteByOrganizationId(Long organizationId);
}

The department module calls methods exposed by the employee module, but it also provides methods for the organization module. Once again we are creating the *InternalAPI interface.

public interface DepartmentInternalAPI {
    List<DepartmentDTO> getDepartmentsByOrganizationId(Long id);
}

Here’s the interface with the methods exposed outside the app as the REST endpoints.

public interface DepartmentExternalAPI {
    DepartmentDTO getDepartmentByIdWithEmployees(Long id);
    DepartmentDTO add(DepartmentDTO department);
}

Finally, we can implement the DepartmentManagement bean. Once again, it contains a method for synchronous calls and two methods for processing events asynchronously (annotated with @ApplicationModuleListener).

@Service
public class DepartmentManagement implements DepartmentInternalAPI, DepartmentExternalAPI {

   private static final Logger LOG = LoggerFactory
      .getLogger(DepartmentManagement.class);
   private DepartmentRepository repository;
   private EmployeeInternalAPI employeeInternalAPI;
   private DepartmentMapper mapper;

   public DepartmentManagement(DepartmentRepository repository,
                               EmployeeInternalAPI employeeInternalAPI,
                               DepartmentMapper mapper) {
      this.repository = repository;
      this.employeeInternalAPI = employeeInternalAPI;
      this.mapper = mapper;
   }

   @Override
   public DepartmentDTO getDepartmentByIdWithEmployees(Long id) {
      DepartmentDTO d = repository.findDTOById(id);
      List<EmployeeDTO> dtos = employeeInternalAPI
         .getEmployeesByDepartmentId(id);
      d.employees().addAll(dtos);
      return d;
   }

   @ApplicationModuleListener
   void onNewOrganizationEvent(OrganizationAddEvent event) {
      LOG.info("onNewOrganizationEvent(orgId={})", event.getId());
      add(new DepartmentDTO(null, event.getId(), "HR"));
      add(new DepartmentDTO(null, event.getId(), "Management"));
   }

   @ApplicationModuleListener
   void onRemovedOrganizationEvent(OrganizationRemoveEvent event) {
      LOG.info("onRemovedOrganizationEvent(orgId={})", event.getId());
      repository.deleteByOrganizationId(event.getId());
   }

   @Override
   public DepartmentDTO add(DepartmentDTO department) {
      return mapper.departmentToEmployeeDTO(
         repository.save(mapper.departmentDTOToEmployee(department))
      );
   }

   @Override
   public List<DepartmentDTO> getDepartmentsByOrganizationId(Long id) {
      return repository.findByOrganizationId(id);
   }
}

Processing Asynchronous Events

Until now we discussed the synchronous communication between the application modules. It is usually the most common way of communication we need. However, in some cases, we can rely on asynchronous events exchanged between the modules. There is support for such an approach in Spring Boot and Spring Modulith. It is based on the Spring ApplicationEvent mechanism.

Let’s switch to the organization module. In the OrganizationManagement module we are implementing several synchronous operations, but we are also sending some Spring events using the ApplicationEventPublisher bean (1). Those events are propagated after adding (2) and removing (3) the organization. For example, assuming we will delete the organization we should also remove all the departments and employees. We can process those actions asynchronously on the department and employee modules side. Our event object contains the id of the organization.

@Service
public class OrganizationManagement implements OrganizationExternalAPI {

   private final ApplicationEventPublisher events; // (1)
   private final OrganizationRepository repository;
   private final DepartmentInternalAPI departmentInternalAPI;
   private final EmployeeInternalAPI employeeInternalAPI;
   private final OrganizationMapper mapper;

   public OrganizationManagement(ApplicationEventPublisher events,
                                 OrganizationRepository repository,
                                 DepartmentInternalAPI departmentInternalAPI,
                                 EmployeeInternalAPI employeeInternalAPI,
                                 OrganizationMapper mapper) {
      this.events = events;
      this.repository = repository;
      this.departmentInternalAPI = departmentInternalAPI;
      this.employeeInternalAPI = employeeInternalAPI;
      this.mapper = mapper;
   }

   @Override
   public OrganizationDTO findByIdWithEmployees(Long id) {
      OrganizationDTO dto = repository.findDTOById(id);
      List<EmployeeDTO> dtos = employeeInternalAPI.getEmployeesByOrganizationId(id);
      dto.employees().addAll(dtos);
      return dto;
   }

   @Override
   public OrganizationDTO findByIdWithDepartments(Long id) {
      OrganizationDTO dto = repository.findDTOById(id);
      List<DepartmentDTO> dtos = departmentInternalAPI.getDepartmentsByOrganizationId(id);
      dto.departments().addAll(dtos);
      return dto;
   }

   @Override
   public OrganizationDTO findByIdWithDepartmentsAndEmployees(Long id) {
      OrganizationDTO dto = repository.findDTOById(id);
      List<DepartmentDTO> dtos = departmentInternalAPI.getDepartmentsByOrganizationIdWithEmployees(id);
      dto.departments().addAll(dtos);
      return dto;
   }

   @Override
   @Transactional
   public OrganizationDTO add(OrganizationDTO organization) {
      OrganizationDTO dto = mapper.organizationToOrganizationDTO(
          repository.save(mapper.organizationDTOToOrganization(organization))
      );
      events.publishEvent(new OrganizationAddEvent(dto.id())); // (2)
      return dto;
   }

   @Override
   @Transactional
   public void remove(Long id) {
      repository.deleteById(id);
      events.publishEvent(new OrganizationRemoveEvent(id)); // (3)
   }

}

Then, the application events may be received by other modules. In order to handle the event we can use the @ApplicationModuleListener annotation provided by Spring Modulith. It is the shortcut for three different Spring annotations: @Async, @Transactional, and @TransactionalEventListener. In the fragment of the DepartmentManagement code, we are handling the incoming events. For the newly created organization, we are adding two default departments. After removing the organization we are removing all the departments previously assigned to that organization.

@ApplicationModuleListener
void onNewOrganizationEvent(OrganizationAddEvent event) {
   LOG.info("onNewOrganizationEvent(orgId={})", event.getId());
   add(new DepartmentDTO(null, event.getId(), "HR"));
   add(new DepartmentDTO(null, event.getId(), "Management"));
}

@ApplicationModuleListener
void onRemovedOrganizationEvent(OrganizationRemoveEvent event) {
   LOG.info("onRemovedOrganizationEvent(orgId={})", event.getId());
   repository.deleteByOrganizationId(event.getId());
}

There’s also a similar method for handling OrganizationRemoveEvent in the EmployeeDepartment.

@ApplicationModuleListener
void onRemovedOrganizationEvent(OrganizationRemoveEvent event) {
   LOG.info("onRemovedOrganizationEvent(orgId={})", event.getId());
   repository.deleteByOrganizationId(event.getId());
}

Spring Modulith comes with a smart mechanism for testing event processing. We are creating a test for the particular module by placing it in the right package. For example, it is the pl.piomin.services.department package to test the department module. We need to annotate the test class with @ApplicationModuleTest. There are three different bootstrap mode types available: STANDALONE, DIRECT_DEPENDENCIES and ALL_DEPENDENCIES. Spring Modulith provides the Scenario abstraction. It can be declared as a test method parameter in the @ApplicationModuleTest tests. Thanks to that object we can define a scenario to publish an event and verify the result in a single line of code.

@ApplicationModuleTest(ApplicationModuleTest.BootstrapMode.DIRECT_DEPENDENCIES)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class DepartmentModuleTests {

    private static final long TEST_ID = 100;

    @Autowired
    DepartmentRepository repository;

    @Test
    @Order(1)
    void shouldAddDepartmentsOnEvent(Scenario scenario) {
        scenario.publish(new OrganizationAddEvent(TEST_ID))
                .andWaitForStateChange(() -> repository.findByOrganizationId(TEST_ID))
                .andVerify(result -> {assert !result.isEmpty();});
    }

    @Test
    @Order(2)
    void shouldRemoveDepartmentsOnEvent(Scenario scenario) {
        scenario.publish(new OrganizationRemoveEvent(TEST_ID))
                .andWaitForStateChange(() -> repository.findByOrganizationId(TEST_ID))
                .andVerify(result -> {assert result.isEmpty();});
    }
}

Exposing Modules API Externally with REST

Finally, let’s switch to the last module in our app – gateway. It doesn’t do much. It is responsible only for exposing some module services outside of the app using REST endpoints. In the first step, we need to inject all the *ExternalAPI beans.

@RestController
@RequestMapping("/api")
public class GatewayManagement {

   private DepartmentExternalAPI departmentExternalAPI;
   private EmployeeExternalAPI employeeExternalAPI;
   private OrganizationExternalAPI organizationExternalAPI;

   public GatewayManagement(DepartmentExternalAPI departmentExternalAPI,
                            EmployeeExternalAPI employeeExternalAPI,
                            OrganizationExternalAPI organizationExternalAPI) {
      this.departmentExternalAPI = departmentExternalAPI;
      this.employeeExternalAPI = employeeExternalAPI;
      this.organizationExternalAPI = organizationExternalAPI;
   }


   @GetMapping("/organizations/{id}/with-departments")
   public OrganizationDTO apiOrganizationWithDepartments(@PathVariable("id") Long id) {
        return organizationExternalAPI.findByIdWithDepartments(id);
   }

   @GetMapping("/organizations/{id}/with-departments-and-employees")
   public OrganizationDTO apiOrganizationWithDepartmentsAndEmployees(@PathVariable("id") Long id) {
      return organizationExternalAPI.findByIdWithDepartmentsAndEmployees(id);
   }

   @PostMapping("/organizations")
   public OrganizationDTO apiAddOrganization(@RequestBody OrganizationDTO o) {
      return organizationExternalAPI.add(o);
   }

   @PostMapping("/employees")
   public EmployeeDTO apiAddEmployee(@RequestBody EmployeeDTO employee) {
      return employeeExternalAPI.add(employee);
   }

   @GetMapping("/departments/{id}/with-employees")
   public DepartmentDTO apiDepartmentWithEmployees(@PathVariable("id") Long id) {
      return departmentExternalAPI.getDepartmentByIdWithEmployees(id);
   }

   @PostMapping("/departments")
   public DepartmentDTO apiAddDepartment(@RequestBody DepartmentDTO department) {
      return departmentExternalAPI.add(department);
   }
}

We can document the REST API exposed by our app using the Springdoc project. Let’s include the following dependency into Maven pom.xml:

<dependency>
   <groupId>org.springdoc</groupId>
   <artifactId>springdoc-openapi-starter-webmvc-ui</artifactId>
   <version>2.2.0</version>
</dependency>

Once we start the app with the mvn spring-boot:run command, we can access Swagger UI with our API documentation under the http://localhost:8080/swagger-ui.html address.

spring-boot-modulith-api

In order to ensure that everything works fine we can implement some REST-based Spring Boot tests. We don’t use any specific Spring Modulith support here, just Spring Boot Test features.

@SpringBootTest(webEnvironment = 
                SpringBootTest.WebEnvironment.RANDOM_PORT)
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
public class AppRestControllerTests {

   @Autowired
   TestRestTemplate restTemplate;

   @Test
   @Order(1)
   void shouldAddNewEmployee() {
      EmployeeDTO emp = new EmployeeDTO(null, 1L, 1L, "Test", 30, "HR");
      emp = restTemplate.postForObject("/api/employees", emp, 
                                       EmployeeDTO.class);
      assertNotNull(emp.id());
   }

   @Test
   @Order(1)
   void shouldAddNewDepartment() {
      DepartmentDTO dep = new DepartmentDTO(null, 1L, "Test");
      dep = restTemplate.postForObject("/api/departments", dep, 
                                       DepartmentDTO.class);
      assertNotNull(dep.id());
   }

   @Test
   @Order(2)
   void shouldFindDepartmentWithEmployees() {
      DepartmentDTO dep = restTemplate
         .getForObject("/api/departments/{id}/with-employees",                    
                       DepartmentDTO.class, 1L);
      assertNotNull(dep);
      assertNotNull(dep.id());
   }
}

Here’s the result of the test visible above:

Documentation and Spring Boot Actuator Support in Spring Modulith

Spring Modulith provides an additional Actuator endpoint that shows the modular structure of the Spring Boot app. We include the following Maven dependencies to use that support:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.modulith</groupId>
   <artifactId>spring-modulith-actuator</artifactId>
   <scope>runtime</scope>
</dependency>

Then, let’s expose all the Actuator endpoints over HTTP by adding the following property to the application.yml file:

management.endpoints.web.exposure.include: "*"

Finally, we can call the modulith endpoint available under the http://localhost:8080/actuator/modulith address. Here’s the JSON response:

{
   "department" : {
      "basePackage" : "pl.piomin.services.department",
      "dependencies" : [
         {
            "target" : "employee",
            "types" : [
               "USES_COMPONENT"
            ]
         }
      ],
      "displayName" : "Department"
   },
   "employee" : {
      "basePackage" : "pl.piomin.services.employee",
      "dependencies" : [],
      "displayName" : "Employee"
   },
   "gateway" : {
      "basePackage" : "pl.piomin.services.gateway",
      "dependencies" : [
         {
            "target" : "employee",
            "types" : [
               "USES_COMPONENT"
            ]
         },
         {
            "target" : "department",
            "types" : [
               "USES_COMPONENT"
            ]
         },
         {
            "target" : "organization",
            "types" : [
               "USES_COMPONENT"
            ]
         }
      ],
      "displayName" : "Gateway"
   },
   "organization" : {
      "basePackage" : "pl.piomin.services.organization",
      "dependencies" : [
         {
            "target" : "employee",
            "types" : [
               "USES_COMPONENT"
            ]
         },
         {
            "target" : "department",
            "types" : [
               "USES_COMPONENT"
            ]
         }
      ],
      "displayName" : "Organization"
   }
}

If you prefer a more graphical form of docs we can leverage the Spring Modulith Documenter component. We don’t need to include anything, but just prepare a simple test that creates and customizes the Documenter object:

public class SpringModulithTests {

   ApplicationModules modules = ApplicationModules
      .of(SpringModulith.class);

   @Test
   void writeDocumentationSnippets() {
      new Documenter(modules)
             .writeModuleCanvases()
             .writeModulesAsPlantUml()
             .writeIndividualModulesAsPlantUml();
   }
}

Once we run the test Spring Modulith will generate the documentation files under the target/spring-modulith-docs directory. Let’s take a look at the UML diagram of our app modules.

Enable Observability

We can enable observability with the Micrometer between our application modules. The Spring Boot app will send them to Zipkin after setting the following list of dependencies:

<dependency>
   <groupId>org.springframework.modulith</groupId>
   <artifactId>spring-modulith-observability</artifactId>
  <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>io.micrometer</groupId>
   <artifactId>micrometer-tracing-bridge-otel</artifactId>
</dependency>
<dependency>
   <groupId>io.opentelemetry</groupId>
   <artifactId>opentelemetry-exporter-zipkin</artifactId>
</dependency>

We can also change the default sampling level to 1.0 (100% of traces).

management.tracing.sampling.probability: 1.0

We can use Spring Boot support for Docker Compose to start Zipkin together with the app. First, let’s create the docker-compose.yml file in the project root directory.

version: "3.7"
services:
  zipkin:
    container_name: zipkin
    image: openzipkin/zipkin
    extra_hosts: [ 'host.docker.internal:host-gateway' ]
    ports:
      - "9411:9411"

Then, we need to add the following dependency in Maven pom.xml:

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-docker-compose</artifactId>
</dependency>

Once, we start the app Spring Boot will try to run the Zipkin container on the Docker. In order to access the Zipkin dashboard visit the http://localhost:9411 address. You will see the traces visualization between the application modules. It works fine for asynchronous events communication. Unfortunately, it doesn’t visualize the synchronous communication properly, but maybe I did something wrong, or we need to wait for some improvements in the Spring Modulith project.

The post Guide to Modulith with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/10/13/guide-to-modulith-with-spring-boot/feed/ 26 14587
An Advanced GraphQL with Spring Boot https://piotrminkowski.com/2023/01/18/an-advanced-graphql-with-spring-boot/ https://piotrminkowski.com/2023/01/18/an-advanced-graphql-with-spring-boot/#comments Wed, 18 Jan 2023 11:15:39 +0000 https://piotrminkowski.com/?p=13938 In this article, you will learn how to use Spring for GraphQL in your Spring Boot app. Spring for GraphQL is a relatively new project. The 1.0 version was released a few months ago. Before that release, we had to include third-party libraries to simplify GraphQL implementation in the Spring Boot app. I have already […]

The post An Advanced GraphQL with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Spring for GraphQL in your Spring Boot app. Spring for GraphQL is a relatively new project. The 1.0 version was released a few months ago. Before that release, we had to include third-party libraries to simplify GraphQL implementation in the Spring Boot app. I have already described two alternative solutions in my previous articles. In the following article, you will learn about the GraphQL Java Kickstart project. In the other article, you can see how to create some more advanced GraphQL queries with the Netflix DGS library.

We will use a very similar schema and entity model as in those two articles about Spring Boot and GraphQL.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

Firstly, you should go to the sample-app-spring-graphql directory. Our sample Spring Boot exposes API over GraphQL and connects to the in-memory H2 database. It uses Spring Data JPA as a layer to interact with the database. There are three entities Employee, Department and Organization. Each of them is stored in a separate table. Here’s a relationship model.

Getting started with Spring for GraphQL

In addition to the standard Spring Boot modules we need to include the following two dependencies:

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-graphql</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.graphql</groupId>
  <artifactId>spring-graphql-test</artifactId>
  <scope>test</scope>
</dependency>

The spring-graph-test provides additional capabilities for building unit tests. The starter comes with required libraries and auto-configuration. However, it does not enable the GraphiQL interface. In order to enable it, we should set the following property in the application.yml file:

spring:
  graphql:
    graphiql:
      enabled: true

By default, Spring for GraphQL tries to load schema files from the src/main/resources/graphql directory. It looks there for the files with the .graphqls or .gqls extensions. Let’s GraphQL schema for the Department entity. The Department type references the two other types: Organization and Employee (the list of employees). There are two queries for searching all departments and a department by id, and a single mutation for adding a new department.

type Query {
   departments: [Department]
   department(id: ID!): Department!
}

type Mutation {
   newDepartment(department: DepartmentInput!): Department
}

input DepartmentInput {
   name: String!
   organizationId: Int
}

type Department {
   id: ID!
   name: String!
   organization: Organization
   employees: [Employee]
}

The Organization type schema is pretty similar. From the more advanced stuff, we need to handle joins to the Employee and Department types.

extend type Query {
   organizations: [Organization]
   organization(id: ID!): Organization!
}

extend type Mutation {
   newOrganization(organization: OrganizationInput!): Organization
}

input OrganizationInput {
   name: String!
}

type Organization {
   id: ID!
   name: String!
   employees: [Employee]
   departments: [Department]
}

And the last schema – for the Employee type. Unlike the previous schemas, it defines the type responsible for handling filtering. The EmployeeFilter is able to filter by salary, position, or age. There is also the query method for handling filtering – employeesWithFilter.

extend type Query {
  employees: [Employee]
  employeesWithFilter(filter: EmployeeFilter): [Employee]
  employee(id: ID!): Employee!
}

extend type Mutation {
  newEmployee(employee: EmployeeInput!): Employee
}

input EmployeeInput {
  firstName: String!
  lastName: String!
  position: String!
  salary: Int
  age: Int
  organizationId: Int!
  departmentId: Int!
}

type Employee {
  id: ID!
  firstName: String!
  lastName: String!
  position: String!
  salary: Int
  age: Int
  department: Department
  organization: Organization
}

input EmployeeFilter {
  salary: FilterField
  age: FilterField
  position: FilterField
}

input FilterField {
  operator: String!
  value: String!
}

Create Entities

Do not hold that against me, but I’m using Lombok in entity implementation. Here’s the Employee entity corresponding to the Employee type defined in GraphQL schema.

@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Employee {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  @EqualsAndHashCode.Include
  private Integer id;
  private String firstName;
  private String lastName;
  private String position;
  private int salary;
  private int age;
  @ManyToOne(fetch = FetchType.LAZY)
  private Department department;
  @ManyToOne(fetch = FetchType.LAZY)
  private Organization organization;
}

Here we have the Department entity.

@Entity
@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Department {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  @EqualsAndHashCode.Include
  private Integer id;
  private String name;
  @OneToMany(mappedBy = "department")
  private Set<Employee> employees;
  @ManyToOne(fetch = FetchType.LAZY)
  private Organization organization;
}

Finally, we can take a look at the Organization entity.

@Entity
@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Organization {
  @Id
  @GeneratedValue(strategy = GenerationType.IDENTITY)
  @EqualsAndHashCode.Include
  private Integer id;
  private String name;
  @OneToMany(mappedBy = "organization")
  private Set<Department> departments;
  @OneToMany(mappedBy = "organization")
  private Set<Employee> employees;
}

Using GraphQL for Spring with Spring Boot

Spring for GraphQL provides an annotation-based programming model using the well-known @Controller pattern. It is also possible to adapt the Querydsl library and use it together with Spring Data JPA. You can then use it in your Spring Data repositories annotated with @GraphQLRepository. In this article, I will use the standard JPA Criteria API for generating more advanced queries with filters and joins.

Let’s start with our first controller. In comparison to both previous articles about Netflix DGS and GraphQL Java Kickstart, we will keep queries and mutations in the same class. We need to annotate query methods with the @QueryMapping, and mutation methods with @MutationMapping. The last query method employeesWithFilter performs advanced filtering based on the dynamic list of fields passed in the input EmployeeFilter type. To pass an input parameter we should annotate the method argument with @Argument.

@Controller
public class EmployeeController {

   DepartmentRepository departmentRepository;
   EmployeeRepository employeeRepository;
   OrganizationRepository organizationRepository;

   EmployeeController(DepartmentRepository departmentRepository,
                      EmployeeRepository employeeRepository, 
                      OrganizationRepository organizationRepository) {
      this.departmentRepository = departmentRepository;
      this.employeeRepository = employeeRepository;
      this.organizationRepository = organizationRepository;
   }

   @QueryMapping
   public Iterable<Employee> employees() {
       return employeeRepository.findAll();
   }

   @QueryMapping
   public Employee employee(@Argument Integer id) {
       return employeeRepository.findById(id).orElseThrow();
   }

   @MutationMapping
   public Employee newEmployee(@Argument EmployeeInput employee) {
      Department department = departmentRepository
         .findById(employee.getDepartmentId()).get();
      Organization organization = organizationRepository
         .findById(employee.getOrganizationId()).get();
      return employeeRepository.save(new Employee(null, employee.getFirstName(), employee.getLastName(),
                employee.getPosition(), employee.getAge(), employee.getSalary(),
                department, organization));
   }

   @QueryMapping
   public Iterable<Employee> employeesWithFilter(
         @Argument EmployeeFilter filter) {
      Specification<Employee> spec = null;
      if (filter.getSalary() != null)
         spec = bySalary(filter.getSalary());
      if (filter.getAge() != null)
         spec = (spec == null ? byAge(filter.getAge()) : spec.and(byAge(filter.getAge())));
      if (filter.getPosition() != null)
         spec = (spec == null ? byPosition(filter.getPosition()) :
                    spec.and(byPosition(filter.getPosition())));
      if (spec != null)
         return employeeRepository.findAll(spec);
      else
         return employeeRepository.findAll();
   }

   private Specification<Employee> bySalary(FilterField filterField) {
      return (root, query, builder) -> filterField
         .generateCriteria(builder, root.get("salary"));
   }

   private Specification<Employee> byAge(FilterField filterField) {
      return (root, query, builder) -> filterField
         .generateCriteria(builder, root.get("age"));
   }

   private Specification<Employee> byPosition(FilterField filterField) {
      return (root, query, builder) -> filterField
         .generateCriteria(builder, root.get("position"));
   }
}

Here’s our JPA repository implementation. In order to use JPA Criteria API we need it needs to extend the JpaSpecificationExecutor interface. The same rule applies to both others DepartmentRepository and OrganizationRepository.

public interface EmployeeRepository extends 
   CrudRepository<Employee, Integer>, JpaSpecificationExecutor<Employee> {
}

Now, let’s switch to another controller. Here’s the implementation of DepartmentController. It shows the example of relationship fetching. We use DataFetchingEnvironment to detect if the input query contains a relationship field. In our case, it may be employees or organization. If any of those fields is defined we add the particular relation to the JOIN statement. The same approach applies to both department and deparments methods

@Controller
public class DepartmentController {

   DepartmentRepository departmentRepository;
   OrganizationRepository organizationRepository;

   DepartmentController(DepartmentRepository departmentRepository, OrganizationRepository organizationRepository) {
      this.departmentRepository = departmentRepository;
      this.organizationRepository = organizationRepository;
   }

   @MutationMapping
   public Department newDepartment(@Argument DepartmentInput department) {
      Organization organization = organizationRepository
         .findById(department.getOrganizationId()).get();
      return departmentRepository.save(new Department(null, department.getName(), null, organization));
   }

   @QueryMapping
   public Iterable<Department> departments(DataFetchingEnvironment environment) {
      DataFetchingFieldSelectionSet s = environment.getSelectionSet();
      List<Specification<Department>> specifications = new ArrayList<>();
      if (s.contains("employees") && !s.contains("organization"))
         return departmentRepository.findAll(fetchEmployees());
      else if (!s.contains("employees") && s.contains("organization"))
         return departmentRepository.findAll(fetchOrganization());
      else if (s.contains("employees") && s.contains("organization"))
         return departmentRepository.findAll(fetchEmployees().and(fetchOrganization()));
      else
         return departmentRepository.findAll();
   }

   @QueryMapping
   public Department department(@Argument Integer id, DataFetchingEnvironment environment) {
      Specification<Department> spec = byId(id);
      DataFetchingFieldSelectionSet selectionSet = environment
         .getSelectionSet();
      if (selectionSet.contains("employees"))
         spec = spec.and(fetchEmployees());
      if (selectionSet.contains("organization"))
         spec = spec.and(fetchOrganization());
      return departmentRepository.findOne(spec).orElseThrow(NoSuchElementException::new);
   }

    private Specification<Department> fetchOrganization() {
        return (root, query, builder) -> {
            Fetch<Department, Organization> f = root
               .fetch("organization", JoinType.LEFT);
            Join<Department, Organization> join = (Join<Department, Organization>) f;
            return join.getOn();
        };
    }

   private Specification<Department> fetchEmployees() {
      return (root, query, builder) -> {
         Fetch<Department, Employee> f = root
            .fetch("employees", JoinType.LEFT);
         Join<Department, Employee> join = (Join<Department, Employee>) f;
         return join.getOn();
      };
   }

   private Specification<Department> byId(Integer id) {
      return (root, query, builder) -> builder.equal(root.get("id"), id);
   }
}

Here’s the OrganizationController implementation.

@Controller
public class OrganizationController {

   OrganizationRepository repository;

   OrganizationController(OrganizationRepository repository) {
      this.repository = repository;
   }

   @MutationMapping
   public Organization newOrganization(@Argument OrganizationInput organization) {
      return repository.save(new Organization(null, organization.getName(), null, null));
   }

   @QueryMapping
   public Iterable<Organization> organizations() {
      return repository.findAll();
   }

   @QueryMapping
   public Organization organization(@Argument Integer id, DataFetchingEnvironment environment) {
      Specification<Organization> spec = byId(id);
      DataFetchingFieldSelectionSet selectionSet = environment
         .getSelectionSet();
      if (selectionSet.contains("employees"))
         spec = spec.and(fetchEmployees());
      if (selectionSet.contains("departments"))
         spec = spec.and(fetchDepartments());
      return repository.findOne(spec).orElseThrow();
   }

   private Specification<Organization> fetchDepartments() {
      return (root, query, builder) -> {
         Fetch<Organization, Department> f = root
            .fetch("departments", JoinType.LEFT);
         Join<Organization, Department> join = (Join<Organization, Department>) f;
         return join.getOn();
      };
   }

   private Specification<Organization> fetchEmployees() {
      return (root, query, builder) -> {
         Fetch<Organization, Employee> f = root
            .fetch("employees", JoinType.LEFT);
         Join<Organization, Employee> join = (Join<Organization, Employee>) f;
         return join.getOn();
      };
   }

   private Specification<Organization> byId(Integer id) {
      return (root, query, builder) -> builder.equal(root.get("id"), id);
   }
}

Create Unit Tests

Once we created the whole logic it’s time to test it. In the next section, I’ll show you how to use GraphiQL IDE for that. Here, we are going to focus on unit tests. The simplest way to start with Spring for GraphQL tests is through the GraphQLTester bean. We can use in mocked web environment. You can also build tests for HTTP layer with another bean – HttpGraphQlTester. However, it requires us to provide an instance of WebTestClient.

Here are the test for the Employee @Controller. Each time we are building an inline query using the GraphQL notation. We need to annotate the whole test class with @AutoConfigureGraphQlTester. Then we can use the DSL API provided by the GraphQLTester to get and verify data from backend. Besides two simple tests we also verifies if EmployeeFilter works fine in the findWithFilter method.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@AutoConfigureGraphQlTester
public class EmployeeControllerTests {

   @Autowired
   private GraphQlTester tester;

   @Test
   void addEmployee() {
      String query = "mutation { newEmployee(employee: { firstName: \"John\" lastName: \"Wick\" position: \"developer\" salary: 10000 age: 20 departmentId: 1 organizationId: 1}) { id } }";
      Employee employee = tester.document(query)
              .execute()
              .path("data.newEmployee")
              .entity(Employee.class)
              .get();
      Assertions.assertNotNull(employee);
      Assertions.assertNotNull(employee.getId());
   }

   @Test
   void findAll() {
      String query = "{ employees { id firstName lastName salary } }";
      List<Employee> employees = tester.document(query)
             .execute()
             .path("data.employees[*]")
             .entityList(Employee.class)
             .get();
      Assertions.assertTrue(employees.size() > 0);
      Assertions.assertNotNull(employees.get(0).getId());
      Assertions.assertNotNull(employees.get(0).getFirstName());
   }

   @Test
   void findById() {
      String query = "{ employee(id: 1) { id firstName lastName salary } }";
      Employee employee = tester.document(query)
             .execute()
             .path("data.employee")
             .entity(Employee.class)
             .get();
      Assertions.assertNotNull(employee);
      Assertions.assertNotNull(employee.getId());
      Assertions.assertNotNull(employee.getFirstName());
   }

   @Test
   void findWithFilter() {
      String query = "{ employeesWithFilter(filter: { salary: { operator: \"gt\" value: \"12000\" } }) { id firstName lastName salary } }";
      List<Employee> employees = tester.document(query)
             .execute()
             .path("data.employeesWithFilter[*]")
             .entityList(Employee.class)
             .get();
      Assertions.assertTrue(employees.size() > 0);
      Assertions.assertNotNull(employees.get(0).getId());
      Assertions.assertNotNull(employees.get(0).getFirstName());
   }
}

Test tests for the Deparment type are very similar. Additionally, we are testing join statements in the findById test method by declaring the organization field in the query.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.MOCK)
@AutoConfigureGraphQlTester
public class DepartmentControllerTests {

   @Autowired
   private GraphQlTester tester;

   @Test
   void addDepartment() {
      String query = "mutation { newDepartment(department: { name: \"Test10\" organizationId: 1}) { id } }";
      Department department = tester.document(query)
             .execute()
             .path("data.newDepartment")
             .entity(Department.class)
             .get();
      Assertions.assertNotNull(department);
      Assertions.assertNotNull(department.getId());
   }

   @Test
   void findAll() {
      String query = "{ departments { id name } }";
      List<Department> departments = tester.document(query)
             .execute()
             .path("data.departments[*]")
             .entityList(Department.class)
             .get();
      Assertions.assertTrue(departments.size() > 0);
      Assertions.assertNotNull(departments.get(0).getId());
      Assertions.assertNotNull(departments.get(0).getName());
   }

   @Test
   void findById() {
      String query = "{ department(id: 1) { id name organization { id } } }";
      Department department = tester.document(query)
             .execute()
             .path("data.department")
             .entity(Department.class)
             .get();
      Assertions.assertNotNull(department);
      Assertions.assertNotNull(department.getId());
      Assertions.assertNotNull(department.getOrganization());
      Assertions.assertNotNull(department.getOrganization().getId());
   }
    
}

Each time you are cloning my repository you can be sure that the examples work fine thanks to automated tests. You can always verify the status of the repository build in my CircleCI pipeline.

Testing with GraphiQL

We can easily start the application with the following Maven command:

$ mvn clean spring-boot:run

Once you do it, you can access the GraphiQL tool under the address http://localhost:8080/graphiql. The app inserts some demo data to the H2 database on startup. GraphiQL provides content assist for building GraphQL queries. Here’s the sample query tested there.

Final Thoughts

Spring for GraphQL is very interesting project and I will be following its development closely. Besides @Controller support, I tried to use the querydsl integration with Spring Data JPA repositories. However, I’ve got some problems with it, and therefore I avoided to place that topic in the article. Currently, Spring for GraphQL is the third solid Java framework with high-level GraphQL support for Spring Boot. My choice is still Netflix DGS, but Spring for GraphQL is rather during the active development. So we can probably except some new and useful features soon.

The post An Advanced GraphQL with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2023/01/18/an-advanced-graphql-with-spring-boot/feed/ 3 13938
Kafka Transactions with Spring Boot https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/ https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/#comments Sat, 29 Oct 2022 08:23:21 +0000 https://piotrminkowski.com/?p=13623 In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to […]

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka transactions with the Spring Kafka project in your Spring Boot app. In order to run the Kafka cluster we will use Upstash. This article provides a basic introduction to Kafka transactions. If you are looking for more advanced usage and scenarios you may refer to that article, about distributed transactions in microservices. You can also read more about Kafka Streams the Spring Cloud Stream project in this article.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Getting Started with Kafka in Spring Boot

I have already created a Kafka cluster on Upstash using a web dashboard. All the connection credentials are generated automatically. You can find and copy them on the main page of your cluster.

Assuming we have a username as the KAFKA_USER variable and a password as the KAFKA_PASS variable we need to provide the following Spring configuration in the application.yml file:

spring:
  application.name: transactions-service
  kafka:
    bootstrap-servers: inviting-camel-5620-eu1-kafka.upstash.io:9092
    properties:
      security.protocol: SASL_SSL
      sasl.mechanism: SCRAM-SHA-256
      sasl.jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="${KAFKA_USER}" password="${KAFKA_PASS}";

Here’s a list of required dependencies. Since we exchange JSON messages, we need the Jackson library for serialization or deserialization. Of course, we also need to include Spring Boot starter and Spring Kafka.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>com.fasterxml.jackson.core</groupId>
  <artifactId>jackson-databind</artifactId>
</dependency>

The transactions-service is generating and sending orders. We will create the test topic transactions on the app startup.

@SpringBootApplication
public class TransactionsService {

   public static void main(String[] args) {
      SpringApplication.run(TransactionsService.class, args);
   }

   @Bean
   public NewTopic transactionsTopic() {
      return TopicBuilder.name("transactions")
          .partitions(3)
          .replicas(1)
          .build();
   }

}

Enabling Kafka Transactions in Spring Boot

In Kafka, a producer initiates a transaction by making a request to the transaction coordinator. You can find a detailed description of that process in the following article on the Confluent blog.

With Spring Boot, we just need to set the spring.kafka.producer.transaction-id-prefix property to enable transactions. Spring Boot will do the rest by automatically configuring a KafkaTransactionManager bean and wiring it into the listener container. Here’s a part of the configuration responsible for the message producer. We use JsonSerializer to serialize data from objects into JSON. Transactions prefix is tx-.

spring:
  kafka:
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      transaction-id-prefix: tx-

During our scenario, we will send 10 messages within a single transaction. In order to observe logs on the consumer side we set a delay between subsequent attempts to 1 second.

@Transactional
public void generateAndSendPackage() 
      throws InterruptedException, TransactionException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      Thread.sleep(1000);
   }
}

Enable Transactions on the Kafka Consumer Side

In the first step, we will just print the incoming messages. We need to annotate the listener method with the @KafkaListener. The target topic is transactions, and the consumer group is a. Also, we have to add the @Transactional annotation to enable transaction support for the listen method.

@Service
public class TransactionsListener {

   private static final Logger LOG = LoggerFactory
          .getLogger(TransactionsListener.class);

   @KafkaListener(
          id = "transactions",
          topics = "transactions",
          containerGroup = "a",
          concurrency = "3")
   @Transactional
   public void listen(Order order) {
      LOG.info("{}", order);
   }
}

Let’s run the producer app first. To do go to the transactions-service directory and execute the command mvn spring-boot:run. It is a good idea to enable more detailed logs for Spring Kafka transactions. To do that add the following line to the application.yml file:

logging:
  level:
    org.springframework.transaction: trace
    org.springframework.kafka.transaction: debug

After that, let’s run the consumer app. In order to that go to the accounts-service directory and run the same command as before. You should see the following topic created in the Upstash console:

kafka-transactions-spring-boot-upstash

The transactions-service app exposes the REST endpoint for sending messages. It just starts that procedure of generating and sending 10 messages within a single transaction I mentioned in the previous section. Let’s call the endpoint:

$ curl -X POST http://localhost:8080/transactions

Let’s see at the logs on the producer side. After sending all the messages it committed the transaction.

kafka-transactions-spring-boot-logs

Now, let’s see how it looks on the consumer side. All the messages are received just after being sent by the producer app. It is not something that we expected…

In order to verify what happened, we need to take a look at the consumer app logs. Here’s a fragment with Kafka consumer settings. As you see, by default Spring Kafka sets the transactions isolation level to read_uncommitted for Spring Boot.

Deep Dive into Transactions with Spring Kafka

In order to solve the problem with transactions from the previous section, we need to change the default isolation level in the application.yml file. As the spring.kafka.consumer.properties we have to set the isolation.level property to read_commited as shown below.

spring:
  application.name: accounts-service
  kafka:
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
      properties:
        spring.json.trusted.packages: "*"
        isolation.level: read_committed

After that let’s run the accounts-service app once again.

Now, all the messages have been received after the producer committed the transaction. There are three consumer threads as we set the @KafkaListener concurrency parameter to 3.

kafka-transactions-spring-boot-producer

In the next step, we will test the rollback of transactions on the producer side. In order to do that, we will modify the method for generating and sending orders. Now, the generateAndSendPackage is getting a boolean parameter, that indicates if a transaction should be rollbacked or not.

@Transactional
public void generateAndSendPackage(boolean error)
       throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order t = new Order(id++, i+1, i+2, 1000, "NEW");
      ListenableFuture<SendResult<Long, Order>> result =
            kafkaTemplate.send("transactions", t.getId(), t);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

Here are the logs from our test. After sending six orders the method throws a RuntimeException and Spring rollbacks a transaction. As expected, the consumer app does not receive any messages.

It is important to know that Spring rollbacks are only on unchecked exceptions by default. To rollback checked exceptions, we need to specify the rollbackFor on the @Transactional annotation.

The transactional producer sends messages to the Kafka cluster even before committing the transaction. You could see it in the previous section, where the listener was continuously receiving messages if the isolation level was read_uncommited. Consequently, if we roll back a transaction on the producer side the message sent before rollback occurs come to the Kafka broker. We can see it e.g. in the Upstash live message view for the transactions topic.

kafka-transactions-spring-boot-live

Here’s the current value of offsets for all partitions in the transactions topic for the a consumer group. We made a successful commit after sending the first package of 10 messages and we rollbacked the transaction with the second package. The sum of offsets is 10 in that case. But in fact, it is different that the current latest offset on those partitions.

To verify it, we can, for example, change a consumer group name for the listener to b and start another instance of the accounts-service.

@KafkaListener(
     id = "transactions",
     topics = "transactions",
     containerGroup = "b",
     concurrency = "3")
@Transactional
public void listen(Order order) {
   LOG.info("{}", order);
}

Here’s the current value of offsets for the b consumer group.

Of course, the messages have been rollbacked. But the important thing to understand here is that these operations happen on the Kafka broker side. The transaction coordinator changes the values of Kafka offsets. We can easily verify that consumer won’t receive messages after rollback even if we the initial offset to the earliest with the spring.kafka.consumer.auto-offset-reset property.

Add Database

In this section, we will extend our scenario with new functionalities. Our app will store the status of orders in the database. Just for demo purposes, we will use an in-memory database H2. There are two dependencies required in this scenario: H2 and Spring Data JPA.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>

There the OrderGroup entity that stores the current status of the package (SENT, CONFIRMED, ROLLBACK), the total number of orders in the single package, and the total number of processed orders by the accounts-service.

@Entity
public class OrderGroup {

   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Long id;
   private String status;
   private int totalNoOfOrders;
   private int processedNoOfOrders;

   // GETTERS/SETTERS ...
}

In order to manage the entity we use the Spring Data repository pattern:

public interface OrderGroupRepository extends 
   CrudRepository<OrderGroup, Long> {
}

We will also include a database in the accounts-service app. When it processes the incoming orders it performs transfers between the source and target account. It will store the account balance in the database.

@Entity
public class Account {

    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private int balance;

   // GETTERS/SETTERS ...
}

The same as before there is a repository bean for managing the Account entity.

public interface AccountRepository extends
   CrudRepository<Account, Long> {
}

We also need to modify the Order message exchanged between the apps. It has to contain the groupId field for processing confirmations.

public class Order {

    private Long id;
    private Long sourceAccountId;
    private Long targetAccountId;
    private int amount;
    private String status;
    private Long groupId;

   // GETTERS/SETTERS ...
}

Here’s the diagram that illustrates our architecture for the described scenario.

kafka-transactions-spring-boot-arch

Handling Transactions Across Multiple Resources

After including Spring Data JPA there are two registered TransactionManager beans with names transactionManager and kafkaTransactionManager. Therefore we need to choose the name of the transaction manager inside the @Transactional annotation. In the first step, we add a new entity to the database. The primary key id is auto-generated in the database and then returned to the object. After that, we get groupId and generate the sequence of orders within that group. Of course, both operations (save to database, sent to Kafka) are part of the same transaction.

@Transactional("kafkaTransactionManager")
public void sendOrderGroup(boolean error) throws InterruptedException {
   OrderGroup og = repository.save(new OrderGroup("SENT", 10, 0));
   generateAndSendPackage(error, og.getId());
}

private void generateAndSendPackage(boolean error, Long groupId)
      throws InterruptedException {
   for (long i = 0; i < 10; i++) {
      Order o = new Order(id++, i+1, i+2, 1000, "NEW", groupId);
      ListenableFuture<SendResult<Long, Order>> result =
         kafkaTemplate.send("transactions", o.getId(), o);
      result.addCallback(callback);
      if (error && i > 5)
         throw new RuntimeException();
      Thread.sleep(1000);
   }
}

The accounts-service app listens for incoming orders. It is processing every single order in a separate transaction. It checks if sufficient funds are in the customer account to make a transfer. If there is enough money, it performs a transfer. Finally, it sends the response to transactions-service with the transaction status. The message is sent to the orders topic.

@KafkaListener(
   id = "transactions",
   topics = "transactions",
   groupId = "a",
   concurrency = "3")
@Transactional("kafkaTransactionManager")
public void listen(Order order) {
   LOG.info("Received: {}", order);
   process(order);
}

private void process(Order order) {
   Account accountSource = repository
      .findById(order.getSourceAccountId())
      .orElseThrow();
   Account accountTarget = repository
      .findById(order.getTargetAccountId())
      .orElseThrow();
   if (accountSource.getBalance() >= order.getAmount()) {
      accountSource.setBalance(accountSource.getBalance() - order.getAmount());
      repository.save(accountSource);
      accountTarget.setBalance(accountTarget.getBalance() + order.getAmount());
      repository.save(accountTarget);
      order.setStatus("PROCESSED");
   } else {
      order.setStatus("FAILED");
   }
   LOG.info("After processing: {}", order);
   kafkaTemplate.send("orders", order.getId(), order);
}

The transactions-service listens for order confirmations on the orders topic. Once it receives the message it increases the number of processed orders within an order group and stores the current result in the database. We should use a default Spring transaction manager since we don’t send any messages to Kafka.

@KafkaListener(
      id = "orders",
      topics = "orders",
      groupId = "a",
      concurrency = "3")
@Transactional("transactionManager")
public void listen(Order order) {
   LOG.info("{}", order);
   OrderGroup og = repository
      .findById(order.getGroupId())
      .orElseThrow();
   if (order.getStatus().equals("PROCESSED")) {      
      og.setProcessedNoOfOrders(og.getProcessedNoOfOrders() + 1);
      og = repository.save(og);
      LOG.info("Current: {}", og);
   }
}

Don’t forget to lock the OrderGroup record during the transaction. Since we are processing messages concurrently (with 3 threads) we need to lock the OrderGroup record until we update the value of processedNoOfOrders column:

public interface OrderGroupRepository extends
        CrudRepository<OrderGroup, Long> {

    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Optional<OrderGroup> findById(Long groupId);
}

Let’s test a positive scenario. We will generate a group of orders that should be confirmed. To do that let’s call our endpoint POST /transactions:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'false'

Here are the logs from the accounts-service app:

We can also take at the logs generated by the transactions-service app:

Finally, we can verify the current status of our order group by calling the following endpoint:

$ curl -X GET 'http://localhost:8080/transactions'

What happens if we roll back the transaction? Try it by yourself with the following command:

$ curl -X 'POST' 'http://localhost:8080/transactions' \
  -H 'Content-Type: application/json' \
  -d 'true'

Final Thoughts

You can easily handle Kafka transactions with Spring Boot using the Spring Kafka project. You can integrate your app with a database and handle transactions across multiple resources. However, one thing needs to be clarified – Kafka does not support XA transactions. It may result in data inconsistency. Spring does not solve that case, it just performs two transactions in the background. When the @Transactional method exits, Spring Boot will commit the database transactions first and then the Kafka transactions. You can just change that order to enable Kafka transaction commit first by configuring the outer method configured to use the DataSourceTransactionManager, and the inner method to use the KafkaTransactionManager.

Can we solve that case somehow? Of course. There is, for example, project Debezium that allows you to stream database changes into Kafka topics. With that approach, you can just commit changes in the database, and then configure Debezium to send events with changes to Kafka. For more details about that tool and outbox pattern please refer to the article available here.

The post Kafka Transactions with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2022/10/29/kafka-transactions-with-spring-boot/feed/ 6 13623
Kafka Streams with Spring Cloud Stream https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/ https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/#comments Thu, 11 Nov 2021 10:07:45 +0000 https://piotrminkowski.com/?p=10193 In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for […]

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Spring Cloud Stream. We will build a simple Spring Boot application that simulates the stock market. Based on that example, I’ll try to explain what a streaming platform is and how it differs from a traditional message broker. If you are looking for an intro to the Spring Cloud Stream project you should read my article about it. It describes how to use Spring Cloud Stream with RabbitMQ in order to build event-driven microservices.

In Spring Cloud Stream there are two binders supporting the Kafka platform. We will focus on the second of them – Apache Kafka Streams Binder. You can read more about it in Spring Cloud documentation available here.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Introduction

There are three major types in Kafka Streams – KStreamKTable and GlobalKTable. Spring Cloud Stream supports all of them. We can easily convert the stream to the table and vice-versa. To clarify, all Kafka topics are stored as a stream. The difference is: when we want to consume that topic, we can either consume it as a table or a stream. KTable takes a stream of records from a topic and reduces it down to unique entries using a key of each message.

Architecture

KStream represents an immutable stream of data where each new record is treated as INSERT. In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. The stock-service application receives and handles events from those topics. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. The final transaction price is an average of sell and buy order price.

kafka-streams-spring-cloud-concept

We are building a very simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sell order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we may perform a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order an amount of product for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

kafka-streams-spring-cloud-arch

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is through Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. You just need to have Docker installed. Once you installed Redpanda on your machine you need to create a cluster. Since you don’t need a large cluster during development, you can create a single-node instance using the following command:

$ rpk container start

After running, it will print the address of your node. For me, it is 127.0.0.1:50842. So, now I can display a list of created topics using the following command:

$ rpk topic list --brokers 127.0.0.1:50842

Currently, there are no topics created. We don’t need to do anything manually. Spring Cloud Stream automatically creates missing topics on the application startup. In case, you would like to remove the Redpanda instance after our exercise, you just need to run the following command:

$ rpk container purge

Perfectly! Our local instance of Kafka is running. After that, we may proceed to the development.

Send events to Kafka with Spring Cloud Stream

In order to generate and send events continuously with Spring Cloud Stream Kafka, we need to define a Supplier bean. In our case, the order-service application generates test data. Each message contains a key and a payload that is serialized to JSON. The message key is the order’s id. We have two Supplier beans since we are sending messages to the two topics. Here’s the Order event class:

@Getter
@Setter
@ToString
@AllArgsConstructor
@NoArgsConstructor
public class Order {
   private Long id;
   private Integer customerId;
   private Integer productId;
   private int productCount;
   @JsonDeserialize(using = LocalDateTimeDeserializer.class)
   @JsonSerialize(using = LocalDateTimeSerializer.class)
   private LocalDateTime creationDate;
   private OrderType type;
   private int amount;
}

Our application uses Lombok and Jackson for messages serialization. Of course, we also need to include Spring Cloud Stream Kafka Binder. Opposite to the consumer side, the producer does not use Kafka Streams, because it is just generating and sending events.

<dependencies>
  <dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-stream-kafka</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

We have a predefined list of orders just to test our solution. We use MessageBuilder to build a message that contains the header kafka_messageKey and the Order payload.

@SpringBootApplication
@Slf4j
public class OrderService {

   private static long orderId = 0;
   private static final Random r = new Random();

   LinkedList<Order> buyOrders = new LinkedList<>(List.of(
      new Order(++orderId, 1, 1, 100, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 2, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 3, 1, 100, LocalDateTime.now(), OrderType.BUY, 1030),
      new Order(++orderId, 4, 1, 200, LocalDateTime.now(), OrderType.BUY, 1050),
      new Order(++orderId, 5, 1, 200, LocalDateTime.now(), OrderType.BUY, 1000),
      new Order(++orderId, 11, 1, 100, LocalDateTime.now(), OrderType.BUY, 1050)
   ));

   LinkedList<Order> sellOrders = new LinkedList<>(List.of(
      new Order(++orderId, 6, 1, 200, LocalDateTime.now(), OrderType.SELL, 950),
      new Order(++orderId, 7, 1, 100, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 8, 1, 100, LocalDateTime.now(), OrderType.SELL, 1050),
      new Order(++orderId, 9, 1, 300, LocalDateTime.now(), OrderType.SELL, 1000),
      new Order(++orderId, 10, 1, 200, LocalDateTime.now(), OrderType.SELL, 1020)
   ));

   public static void main(String[] args) {
      SpringApplication.run(OrderService.class, args);
   }

   @Bean
   public Supplier<Message<Order>> orderBuySupplier() {
      return () -> {
         if (buyOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(buyOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(buyOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

   @Bean
   public Supplier<Message<Order>> orderSellSupplier() {
      return () -> {
         if (sellOrders.peek() != null) {
            Message<Order> o = MessageBuilder
                  .withPayload(sellOrders.peek())
                  .setHeader(KafkaHeaders.MESSAGE_KEY, Objects.requireNonNull(sellOrders.poll()).getId())
                  .build();
            log.info("Order: {}", o.getPayload());
            return o;
         } else {
            return null;
         }
      };
   }

}

After that, we need to provide some configuration settings inside the application.yml file. Since we use multiple binding beans (in our case Supplier beans) we have to define the property spring.cloud.stream.function.definition that contains a list of bindable functions. We need to pass the Supplier method names divided by a semicolon. In the next few lines, we are setting the name of the target topics on Kafka and the message key serializer. Of course, we also need to set the address of the Kafka broker.

spring.kafka.bootstrap-servers: ${KAFKA_URL}

spring.cloud.stream.function.definition: orderBuySupplier;orderSellSupplier

spring.cloud.stream.bindings.orderBuySupplier-out-0.destination: orders.buy
spring.cloud.stream.kafka.bindings.orderBuySupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

spring.cloud.stream.bindings.orderSellSupplier-out-0.destination: orders.sell
spring.cloud.stream.kafka.bindings.orderSellSupplier-out-0.producer.configuration.key.serializer: org.apache.kafka.common.serialization.LongSerializer

Before running the application I need to create an environment variable containing the address of the Kafka broker.

$ export KAFKA_URL=127.0.0.1:50842

Then, let’s run our Spring Cloud application using the following Maven command:

$ mvn clean spring-boot:run

Once you did that, it sent some test orders for the same product (productId=1) as shown below.

We can also verify a list of topics on our local Kafka instance. Both of them have been automatically created by the Spring Cloud Stream Kafka binder before sending messages.

Consume Kafka Streams with Spring Cloud Stream

Now, we are going to switch to the stock-service implementation. In order to process streams of events, we need to include the Spring Cloud Stream Kafka Streams binder. Also, our application would have an ORM layer for storing data, so we have to include the Spring Data JPA starter and the H2 database.

<dependencies>
  <dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
  </dependency>
  <dependency>
    <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-jpa</artifactId>
  </dependency>
  <dependency>
    <groupId>com.h2database</groupId>
    <artifactId>h2</artifactId>
  </dependency>
  <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
  </dependency>
  <dependency>
    <groupId>com.fasterxml.jackson.datatype</groupId>
    <artifactId>jackson-datatype-jsr310</artifactId>
  </dependency>
</dependencies>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams, we need to declare a functional bean that takes KStream as an input parameter. If there are two sources, we have to use BiConsumer (just for consumption) or BiFunction (to consume and send events to the new target stream) beans. In that case, we are not creating a new stream of events, so we can use BiConsumer.

@Autowired
OrderLogic logic;

@Bean
public BiConsumer<KStream<Long, Order>, KStream<Long, Order>> orders() {
   return (orderBuy, orderSell) -> orderBuy
         .merge(orderSell)
         .peek((k, v) -> {
            log.info("New({}): {}", k, v);
            logic.add(v);
         });
}

After that, we need to add some configuration settings. There are two input topics, so we need to map their names. Also, if we have more than one functional bean we need to set applicationId related to the particular function. For now, it is not required, since we have only a single function. But later, we are going to add other functions for some advanced operations.

spring.cloud.stream.bindings.orders-in-0.destination: orders.buy
spring.cloud.stream.bindings.orders-in-1.destination: orders.sell
spring.cloud.stream.kafka.streams.binder.functions.orders.applicationId: orders

For now, that’s all. You can now run the instance of stock-service using the Maven command mvn spring-boot:run.

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

In order to implement the scenario described above, we need to define the BiFunction bean. It takes two input KStream from orders.buy and orders.sell and creates a new KStream of transaction events sent to the output transactions topic. While joining streams it uses 10 seconds sliding window and invokes the execute method for creating a new transaction.

@Bean
public BiFunction<KStream<Long, Order>, KStream<Long, Order>, KStream<Long, Transaction>> transactions() {
   return (orderBuy, orderSell) -> orderBuy
         .selectKey((k, v) -> v.getProductId())
         .join(orderSell.selectKey((k, v) -> v.getProductId()),
               this::execute,
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Integer(), 
                                 new JsonSerde<>(Order.class), 
                                 new JsonSerde<>(Order.class)))
         .filterNot((k, v) -> v == null)
         .map((k, v) -> new KeyValue<>(v.getId(), v))
         .peek((k, v) -> log.info("Done -> {}", v));
}

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), orderSell.getProductCount());
      boolean allowed = logic.performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            Math.min(orderBuy.getProductCount(), orderSell.getProductCount()),
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW");
   } else {
      return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@Service
public class OrderLogic {

   private OrderRepository repository;

   public OrderLogic(OrderRepository repository) {
      this.repository = repository;
   }

   public Order add(Order order) {
      return repository.save(order);
   }

   @Transactional
   public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
      Order buyOrder = repository.findById(buyOrderId).orElseThrow();
      Order sellOrder = repository.findById(sellOrderId).orElseThrow();
      int buyAvailableCount = buyOrder.getProductCount() - buyOrder.getRealizedCount();
      int sellAvailableCount = sellOrder.getProductCount() - sellOrder.getRealizedCount();
      if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
         buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
         sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
         repository.save(buyOrder);
         repository.save(sellOrder);
         return true;
      } else {
         return false;
      }
   }
}

Here’s our repository class with the findById method. It sets a pessimistic lock on the Order entity during the transaction.

public interface OrderRepository extends CrudRepository<Order, Long> {

  @Lock(LockModeType.PESSIMISTIC_WRITE)
  Optional<Order> findById(Long id);

}

We also need to provide configuration settings for the transaction BiFunction.

spring.cloud.stream.bindings.transactions-in-0.destination: orders.buy
spring.cloud.stream.bindings.transactions-in-1.destination: orders.sell
spring.cloud.stream.bindings.transactions-out-0.destination: transactions
spring.cloud.stream.kafka.streams.binder.functions.transactions.applicationId: transactions

spring.cloud.stream.function.definition: orders;transactions

Use Kafka KTable with Spring Cloud Stream

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. Now, we would like to examine data generated by our stock-service application. The most important things are how many transactions were generated, what was the volume of transactions globally, and per product. Three key statistics related to our transactions are: the number of transactions, the number of products sell/buy during transactions, and the total amount of transactions (price * productsCount). Here’s the definition of our object used for counting aggregations.

@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
@ToString
public class TransactionTotal {
   private int count;
   private int productCount;
   private int amount;
}

In order to call an aggregation method, we first need to group orders stream by the selected key. In the method visible below we use the status field as a grouping key. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total amount. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name all-transactions-store.

@Bean
public Consumer<KStream<Long, Transaction>> total() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "all-transactions-store");
   return transactions -> transactions
         .groupBy((k, v) -> v.getStatus(), 
                  Grouped.with(Serdes.String(), new JsonSerde<>(Transaction.class)))
         .aggregate(
                 TransactionTotal::new,
                 (k, v, a) -> {
                    a.setCount(a.getCount() + 1);
                    a.setProductCount(a.getProductCount() + v.getAmount());
                    a.setAmount(a.getAmount() + (v.getPrice() * v.getAmount()));
                    return a;
                 },
                 Materialized.<String, TransactionTotal> as(storeSupplier)
                    .withKeySerde(Serdes.String())
                    .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total: {}", v));
}

The next function performs a similar aggregate operation, but this time per each product. Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. The same as before we are materializing aggregation as a state store.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> totalPerProduct() {
   KeyValueBytesStoreSupplier storeSupplier = Stores.persistentKeyValueStore(
                "transactions-per-product-store");
   return (transactions, orders) -> transactions
         .selectKey((k, v) -> v.getSellOrderId())
         .join(orders.selectKey((k, v) -> v.getId()),
               (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
               JoinWindows.of(Duration.ofSeconds(10)),
               StreamJoined.with(Serdes.Long(), 
                  new JsonSerde<>(Transaction.class), 
                  new JsonSerde<>(Order.class)))
         .groupBy((k, v) -> v.getProductId(), 
            Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
         .aggregate(
               TransactionTotal::new,
               (k, v, a) -> {
                  a.setCount(a.getCount() + 1);
                  a.setProductCount(a.getProductCount() + v.getTransaction().getAmount());
                  a.setAmount(a.getAmount() + (v.getTransaction().getPrice() * v.getTransaction().getAmount()));
                  return a;
               },
               Materialized.<Integer, TransactionTotal> as(storeSupplier)
                  .withKeySerde(Serdes.Integer())
                  .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
         .toStream()
         .peek((k, v) -> log.info("Total per product({}): {}", k, v));
}

What if we would like to perform similar aggregations to described above, but only for a particular period of time? We need to invoke the windowedBy method and produce a dedicated state store for such operations.

@Bean
public BiConsumer<KStream<Long, Transaction>, KStream<Long, Order>> latestPerProduct() {
   WindowBytesStoreSupplier storeSupplier = Stores.persistentWindowStore(
      "latest-transactions-per-product-store", Duration.ofSeconds(30), Duration.ofSeconds(30), false);
   return (transactions, orders) -> transactions
      .selectKey((k, v) -> v.getSellOrderId())
      .join(orders.selectKey((k, v) -> v.getId()),
            (t, o) -> new TransactionTotalWithProduct(t, o.getProductId()),
            JoinWindows.of(Duration.ofSeconds(10)),
            StreamJoined.with(Serdes.Long(), new JsonSerde<>(Transaction.class), new JsonSerde<>(Order.class)))
      .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), new JsonSerde<>(TransactionTotalWithProduct.class)))
      .windowedBy(TimeWindows.of(Duration.ofSeconds(30)))
      .aggregate(
            TransactionTotal::new,
            (k, v, a) -> {
               a.setCount(a.getCount() + 1);
               a.setAmount(a.getAmount() + v.getTransaction().getAmount());
               return a;
            },
            Materialized.<Integer, TransactionTotal> as(storeSupplier)
               .withKeySerde(Serdes.Integer())
               .withValueSerde(new JsonSerde<>(TransactionTotal.class)))
      .toStream()
      .peek((k, v) -> log.info("Total per product last 30s({}): {}", k, v));
}

Interactive queries

We have already created and configured all required Kafka Streams with Spring Cloud. Finally, we can execute queries on state stores. This operation is called an interactive query. Let’s create a REST controller for exposing such endpoints with the results. In order to query Kafka Streams state stores with Spring Cloud, we need to inject the InteractiveQueryService bean into the controller.

@RestController
@RequestMapping("/transactions")
public class TransactionController {

   private InteractiveQueryService queryService;

   public TransactionController(InteractiveQueryService queryService) {
      this.queryService = queryService;
   }

   @GetMapping("/all")
   public TransactionTotal getAllTransactionsSummary() {
      ReadOnlyKeyValueStore<String, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("all-transactions-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get("NEW");
   }

   @GetMapping("/product/{productId}")
   public TransactionTotal getSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product/latest/{productId}")
   public TransactionTotal getLatestSummaryByProductId(@PathVariable("productId") Integer productId) {
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("latest-transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      return keyValueStore.get(productId);
   }

   @GetMapping("/product")
   public Map<Integer, TransactionTotal> getSummaryByAllProducts() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      ReadOnlyKeyValueStore<Integer, TransactionTotal> keyValueStore =
                queryService.getQueryableStore("transactions-per-product-store",
                        QueryableStoreTypes.keyValueStore());
      KeyValueIterator<Integer, TransactionTotal> it = keyValueStore.all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

}

Before you run the latest version of the stock-service application you should generate more differentiated random data. Let’s say we would like to generate orders for 5 different products with floating prices as shown below. Just uncomment the following fragment of code in the order-service and run the application once again to generate an infinitive stream of events.

private static long orderId = 0;
private static final Random r = new Random();

private Map<Integer, Integer> prices = Map.of(
      1, 1000, 
      2, 2000, 
      3, 5000, 
      4, 1500, 
      5, 2500);

@Bean
public Supplier<Message<Order>> orderBuySupplier() {
   return () -> {
      Integer productId = r.nextInt(1, 6);
      int price = prices.get(productId) + r.nextInt(-100, 100);
      Order o = new Order(
         ++orderId,
         r.nextInt(1, 6),
         productId,
         100,
         LocalDateTime.now(),
         OrderType.BUY,
         price);
      log.info("Order: {}", o);
      return MessageBuilder
         .withPayload(o)
         .setHeader(KafkaHeaders.MESSAGE_KEY, orderId)
         .build();
   };
}

You may also want to generate more messages. To do that you need to decrease timeout for Spring Cloud Stream Kafka Supplier.

spring.cloud.stream.poller.fixedDelay: 100

After running both our sample applications you may verify the logs on the stock-service side.

Then you may call our REST endpoints performing interactive queries on the materialized Kafka KTable.

$ curl http://localhost:8080/transactions/all
$ curl http://localhost:8080/transactions/product/3
$ curl http://localhost:8080/transactions/product/latest/5

It looks simple? Well, under the hood it may look quite more complicated πŸ™‚ Here’s a final list of topics automatically created to the needs of our application.

kafka-streams-spring-cloud-topics

Final Thoughts

Spring Cloud Stream simplifies working with Kafka Streams and interactive queries. Kafka Streams by itself is a very powerful mechanism. In this article, I showed you how we can use it to implement not very trivial logic and then analyze data in various ways.

The post Kafka Streams with Spring Cloud Stream appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/11/kafka-streams-with-spring-cloud-stream/feed/ 21 10193
Express JPA Queries as Java Streams https://piotrminkowski.com/2021/07/13/express-jpa-queries-as-java-streams/ https://piotrminkowski.com/2021/07/13/express-jpa-queries-as-java-streams/#comments Tue, 13 Jul 2021 11:00:11 +0000 https://piotrminkowski.com/?p=9949 In this article, you will learn how to use the JPAstreamer library to express your JPA queries with Java streams. I will also show you how to integrate this library with Spring Boot and Spring Data. The idea around it is very simple but at the same time brilliant. The library creates a SQL query […]

The post Express JPA Queries as Java Streams appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use the JPAstreamer library to express your JPA queries with Java streams. I will also show you how to integrate this library with Spring Boot and Spring Data. The idea around it is very simple but at the same time brilliant. The library creates a SQL query based on your Java stream. That’s all. I have already mentioned this library on my Twitter account.

jpa-java-streams-twitter

Before we start, let’s take a look at the following picture. It should explain the concept in a simple way. That’s pretty intuitive, right?

jpa-java-streams-table

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions. Let’s begin.

Dependencies and configuration

As an example, we have a simple Spring Boot application that runs an embedded H2 database and exposes data through a REST API. It also uses Spring Data JPA to interact with the database. But with the JPAstreamer library, this is completely transparent for us. So, in the first step, we need to include the following two dependencies. The first of them adds JPAstreamer while the second integrate it with Spring Boot.

<dependency>
  <groupId>com.speedment.jpastreamer</groupId>
  <artifactId>jpastreamer-core</artifactId>
  <version>1.0.1</version>
</dependency>
<dependency>
  <groupId>com.speedment.jpastreamer.integration.spring</groupId>
  <artifactId>spring-boot-jpastreamer-autoconfigure</artifactId>
  <version>1.0.1</version>
</dependency>

Then, we need to add Spring Boot Web and JPA starters, H2 database, and optionally Lombok.

<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.boot</groupId>
  <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
  <groupId>com.h2database</groupId>
  <artifactId>h2</artifactId>
  <scope>runtime</scope>
</dependency>
<dependency>
  <groupId>org.projectlombok</groupId>
  <artifactId>lombok</artifactId>
  <version>1.18.20</version>
</dependency>

I’m using Java 15 for compilation. Because I use Java records for DTO I need to enable preview features. Here’s the plugin responsible for it.

<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-compiler-plugin</artifactId>
  <version>3.8.1</version>
  <configuration>
    <release>15</release>
    <compilerArgs>
        --enable-preview
    </compilerArgs>
    <source>15</source>
    <target>15</target>
  </configuration>
</plugin>

The JPAstreamer library generates source code based on your entity model. Then we may use it, for example, to perform filtering or sorting. But we will talk about it in the next part of the article. For now, let’s configure the build process with build-helper-maven-plugin. It generates the source code in the target/generated-sources/annotations directory. If you use IntelliJ it is automatically included as a source folder in your project.

<plugin>
  <groupId>org.codehaus.mojo</groupId>
  <artifactId>build-helper-maven-plugin</artifactId>
  <version>3.2.0</version>
  <executions>
    <execution>
      <phase>generate-sources</phase>
      <goals>
        <goal>add-source</goal>
      </goals>
      <configuration>
        <sources>
          <source>${project.build.directory}/generated-sources/annotations</source>
        </sources>
      </configuration>
    </execution>
  </executions>
</plugin>

Here is a source code generated by JPAstreamer for our entity model.

Entity model for JPA

Let’s take a look at our example entities. Here’s the Employee class. Each employee is assigned to the department and organization.

@Entity
@NoArgsConstructor
@Getter
@Setter
@ToString
public class Employee {
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Integer id;
   private String name;
   private String position;
   private int salary;
   @ManyToOne(fetch = FetchType.LAZY)
   private Department department;
   @ManyToOne(fetch = FetchType.LAZY)
   private Organization organization;
}

Here’s the Department entity.

@Entity
@NoArgsConstructor
@Getter
@Setter
public class Department {
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Integer id;
   private String name;
   @OneToMany(mappedBy = "department")
   private Set<Employee> employees;
   @ManyToOne(fetch = FetchType.LAZY)
   private Organization organization;
}

Here’s the Organization entity.

@Entity
@NoArgsConstructor
@Getter
@Setter
public class Organization {
   @Id
   @GeneratedValue(strategy = GenerationType.IDENTITY)
   private Integer id;
   private String name;
   @OneToMany(mappedBy = "organization")
   private Set<Department> departments;
   @OneToMany(mappedBy = "organization")
   private Set<Employee> employees;
}

We will also use Java records to create DTOs. Here’s a simple DTO for the Employee entity.

public record EmployeeDTO(
   Integer id,
   String name,
   String position,
   int salary
) {
   public EmployeeDTO(Employee emp) {
      this(emp.getId(), emp.getName(), emp.getPosition(), emp.getSalary());
   }
}

We also have a DTO record to express relationship fields.

public record EmployeeWithDetailsDTO(
   Integer id,
   String name,
   String position,
   int salary,
   String organizationName,
   String departmentName
) {
   public EmployeeWithDetailsDTO(Employee emp) {
      this(emp.getId(), emp.getName(), emp.getPosition(), emp.getSalary(),
            emp.getOrganization().getName(),
            emp.getDepartment().getName());
   }
}

Express JPA queries as Java streams

Let’s begin with a simple example. We would like to get all the departments, sort it ascending based on the name field, and then convert it to DTO. We just need to get an instance of JPAstreamer object and invoke a stream() method. Then you do everything else as you would act with standard Java streams.

@GetMapping
public List<DepartmentDTO> findAll() {
   return streamer.stream(Department.class)
        .sorted(Department$.name)
        .map(DepartmentDTO::new)
        .collect(Collectors.toList());
}

Now, we can call the endpoint after starting our Spring Boot application.

$ curl http://localhost:8080/departments
[{"id":4,"name":"aaa"},{"id":3,"name":"bbb"},{"id":2,"name":"ccc"},{"id":1,"name":"ddd"}]

Let’s take a look at something a little bit more advanced. We are going to find employees with salaries greater than an input value, sort them by salaries, and of course map to DTO.

@GetMapping("/greater-than/{salary}")
public List<EmployeeDTO> findBySalaryGreaterThan(@PathVariable("salary") int salary) {
   return streamer.stream(Employee.class)
         .filter(Employee$.salary.greaterThan(salary))
         .sorted(Employee$.salary)
         .map(EmployeeDTO::new)
         .collect(Collectors.toList());
}

Then, we call another endpoint once again.

$ curl http://localhost:8080/employees/greater-than/25000    
[{"id":5,"name":"Test5","position":"Architect","salary":30000},{"id":7,"name":"Test7","position":"Manager","salary":30000},{"id":9,"name":"Test9","position":"Developer","salary":30000}]

We can also perform JPA pagination operations by using skip and limit Java streams methods.

@GetMapping("/offset/{offset}/limit/{limit}")
public List<EmployeeDTO> findAllWithPagination(
      @PathVariable("offset") int offset, 
      @PathVariable("limit") int limit) {
   return streamer.stream(Employee.class)
         .skip(offset)
         .limit(limit)
         .map(EmployeeDTO::new)
         .collect(Collectors.toList());
}

What is important all such operations are performed on the database side. Here’s the SQL query generated for the implementation visible above.

What about relationships between entities? Of course relationships between tables are handled via the JPA provider. In order to perform the JOIN operation with JPAstreamer, we just need to specify the joining stream. By default, it is LEFT JOIN, but we can customize it when calling the joining() method. In the following fragment of code, we join Department and Organization, which are in @ManyToOne relationship with the Employee entity.

@GetMapping("/{id}")
public EmployeeWithDetailsDTO findById(@PathVariable("id") Integer id) {
   return streamer.stream(of(Employee.class)
           .joining(Employee$.department)
           .joining(Employee$.organization))
        .filter(Employee$.id.equal(id))
        .map(EmployeeWithDetailsDTO::new)
        .findFirst()
        .orElseThrow();
}

Of course, we can call many other Java stream methods. In the following fragment of code, we count the number of employees assigned to the particular department.

@GetMapping("/{id}/count-employees")
public long getNumberOfEmployees(@PathVariable("id") Integer id) {
   return streamer.stream(Department.class)
         .filter(Department$.id.equal(id))
         .map(Department::getEmployees)
         .mapToLong(Set::size)
         .sum();
}

And the last example today. We get all the employees assigned to a particular department and map each of them to EmployeeDTO.

@GetMapping("/{id}/employees")
public List<EmployeeDTO> getEmployees(@PathVariable("id") Integer id) {
   return streamer.stream(Department.class)
         .filter(Department$.id.equal(id))
         .map(Department::getEmployees)
         .flatMap(Set::stream)
         .map(EmployeeDTO::new)
         .collect(Collectors.toList());
}

Integration with Spring Boot

We can easily integrate JPAstreamer with Spring Boot and Spring Data JPA. In fact, you don’t have anything more than just include a dependency responsible for integration with Spring. It provides auto-configuration also for Spring Data JPA. Therefore, we just need to inject the JPAStreamer bean into the target service or controller.

@RestController
@RequestMapping("/employees")
public class EmployeeController {

   private final JPAStreamer streamer;

   public EmployeeController(JPAStreamer streamer) {
      this.streamer = streamer;
   }

   @GetMapping("/greater-than/{salary}")
   public List<EmployeeDTO> findBySalaryGreaterThan(
      @PathVariable("salary") int salary) {
   return streamer.stream(Employee.class)
        .filter(Employee$.salary.greaterThan(salary))
        .sorted(Employee$.salary)
        .map(EmployeeDTO::new)
        .collect(Collectors.toList());
   }

   // ...

}

Final Thoughts

The concept around the JPAstreamer library seems to be very interesting. I really like it. The only disadvantage I found is that it sends certain data back to Speedment’s servers for Google Analytics. If you wish to disable this feature, you need to contact their team. To be honest, I’m concerned a little. But it doesn’t change my point of view, that JPAstreamer is a very useful library. If you are interested in topics related to Java streams and collections you may read my article Using Eclipse Collections.

The post Express JPA Queries as Java Streams appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/07/13/express-jpa-queries-as-java-streams/feed/ 21 9949
An Advanced GraphQL with Spring Boot and Netflix DGS https://piotrminkowski.com/2021/04/08/an-advanced-graphql-with-spring-boot-and-netflix-dgs/ https://piotrminkowski.com/2021/04/08/an-advanced-graphql-with-spring-boot-and-netflix-dgs/#comments Thu, 08 Apr 2021 08:05:27 +0000 https://piotrminkowski.com/?p=9639 In this article, you will learn how to use the Netflix DGS library to simplify GraphQL development with Spring Boot. We will discuss more advanced topics related to GraphQL and databases, like filtering or relationship fetching. I published a similar article some months ago: An Advanced Guide to GraphQL with Spring Boot. However, it is […]

The post An Advanced GraphQL with Spring Boot and Netflix DGS appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use the Netflix DGS library to simplify GraphQL development with Spring Boot. We will discuss more advanced topics related to GraphQL and databases, like filtering or relationship fetching. I published a similar article some months ago: An Advanced Guide to GraphQL with Spring Boot. However, it is based on a different library called GraphQL Java Kickstart (https://github.com/graphql-java-kickstart/graphql-spring-boot). Since Netflix DGS has been released some months ago, you might want to take look at it. So, that’s what we will do now.

Netflix DGS is an annotation-based GraphQL Java library built on top of Spring Boot. Consequently, it is dedicated to Spring Boot applications. Besides the annotation-based programming model, it provides several useful features. Netflix DGS allows generating source code from GraphQL schemas. It simplifies writing unit tests and also supports websockets, file uploads, or GraphQL federation. In order to show you the differences between this library and the previously described Kickstart library, I’ll use the same Spring Boot application as before. Let me just briefly describe our scenario.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should just follow my instructions.

First, you should go to the sample-app-netflix-dgs directory. The example with GraphQL Java Kickstart is available inside the sample-app-kickstart directory.

As I mentioned before, we use the same schema and entity model as before. I created an application that exposes API using GraphQL and connects to H2 in-memory database. We will discuss Spring Boot GraphQL JPA support. For integration with the H2 database, I’m using Spring Data JPA and Hibernate. I have implemented three entities EmployeeDepartment and Organization β€“ each of them stored in the separated table. A relationship model between them is visualized in the picture below.

spring-boot-graphql-netflix-domain

1. Dependencies for Spring Boot and Netflix GraphQL

Let’s start with dependencies. We need to include Spring Web, Spring Data JPA, and the com.database:h2 artifact for running an in-memory database with our application. Of course, we also have to include Netflix DGS Spring Boot Starter. Here’s a list of required dependencies in Maven pom.xml.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
   <groupId>com.h2database</groupId>
   <artifactId>h2</artifactId>
   <scope>runtime</scope>
</dependency>
<dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
</dependency>
<dependency>
   <groupId>com.netflix.graphql.dgs</groupId>
   <artifactId>graphql-dgs-spring-boot-starter</artifactId>
   <version>${netflix-dgs.spring.version}</version>
</dependency>

2. GraphQL schemas

Before we start implementation, we need to create GraphQL schemas with objects, queries, and mutations. A schema may be defined in multiple graphqls files, but all of them have to be placed inside the /src/main/resources/schemas directory. Thanks to that, the Netflix DGS library detects and loads them automatically.

GraphQL schema for each entity is located in the separated file. Let’s take a look at the department.graphqls file. There is the QueryResolver with two find methods and the MutationResolver with a single method for adding new departments. We also have an input object for mutation and a standard type definition for queries.

type QueryResolver {
   departments: [Department]
   department(id: ID!): Department!
}

type MutationResolver {
   newDepartment(department: DepartmentInput!): Department
}

input DepartmentInput {
   name: String!
   organizationId: Int
}

type Department {
   id: ID!
   name: String!
   organization: Organization
   employees: [Employee]
}

Then we may take a look at the organization.graphqls file. It is a little bit more complicated than the previous schema. As you see I’m using the keyword extend on QueryResolver and MutationResolver. That’s because we have several files with GraphQL schemas.

extend type QueryResolver {
  organizations: [Organization]
  organization(id: ID!): Organization!
}

extend type MutationResolver {
  newOrganization(organization: OrganizationInput!): Organization
}

input OrganizationInput {
  name: String!
}

type Organization {
  id: ID!
  name: String!
  employees: [Employee]
  departments: [Department]
}

Finally, the schema for the Employee entity. In contrast to the previous schemas, it has objects responsible for filtering like EmployeeFilter. We also need to define the schema object with mutation and query.

extend type QueryResolver {
  employees: [Employee]
  employeesWithFilter(filter: EmployeeFilter): [Employee]
  employee(id: ID!): Employee!
}

extend type MutationResolver {
  newEmployee(employee: EmployeeInput!): Employee
}

input EmployeeInput {
  firstName: String!
  lastName: String!
  position: String!
  salary: Int
  age: Int
  organizationId: Int!
  departmentId: Int!
}

type Employee {
  id: ID!
  firstName: String!
  lastName: String!
  position: String!
  salary: Int
  age: Int
  department: Department
  organization: Organization
}

input EmployeeFilter {
  salary: FilterField
  age: FilterField
  position: FilterField
}

input FilterField {
  operator: String!
  value: String!
}

schema {
  query: QueryResolver
  mutation: MutationResolver
}

3. Domain Model for GraphQL and Hibernate

We could have generated Java source code using previously defined GraphQL schemas. However, I prefer to use Lombok annotations, so I will do it manually. Here’s the Employee entity corresponding to the Employee object defined in GraphQL schema.

@Entity
@Data
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Employee {
   @Id
   @GeneratedValue
   @EqualsAndHashCode.Include
   private Integer id;
   private String firstName;
   private String lastName;
   private String position;
   private int salary;
   private int age;
   @ManyToOne(fetch = FetchType.LAZY)
   private Department department;
   @ManyToOne(fetch = FetchType.LAZY)
   private Organization organization;
}

Also, let’s take a look at the Department entity.

@Entity
@Data
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Department {
   @Id
   @GeneratedValue
   @EqualsAndHashCode.Include
   private Integer id;
   private String name;
   @OneToMany(mappedBy = "department")
   private Set<Employee> employees;
   @ManyToOne(fetch = FetchType.LAZY)
   private Organization organization;
}

The input objects are much simpler. Just to compare, here’s the DepartmentInput class.

@Data
@NoArgsConstructor
public class DepartmentInput {
   private String name;
   private Integer organizationId;
}

4. Using Netflix DGS with Spring Boot

Netflix DGS provides annotation-based support for Spring Boot. Let’s analyze the most interesting features using the example implementation of a query resolver. The EmployeeFetcher is responsible for defining queries related to the Employee object. We should annotate such a class with @DgsComponent (1). We may create our custom context definition to pass data between different methods or even different query resolvers (2). Then, we have to annotate every query method with @DgsData (3). The fields parentType and fields should match the names declared in GraphQL schemas. We defined three queries in the employee.graphqls file, so we have three methods inside EmployeeFetcher. After fetching all employees, we may save them in our custom context object (4), and then reuse them in other methods or resolvers (5).

The last query method findWithFilter performs advanced filtering based on the dynamic list of fields passed in the input (6). To pass an input parameter we should annotate the method argument with @InputArgument.

@DgsComponent // (1)
public class EmployeeFetcher {

   private EmployeeRepository repository;
   private EmployeeContextBuilder contextBuilder; // (2)

   public EmployeeFetcher(EmployeeRepository repository, 
         EmployeeContextBuilder contextBuilder) {
      this.repository = repository;
      this.contextBuilder = contextBuilder;
    }

   @DgsData(parentType = "QueryResolver", field = "employees") // (3)
   public List<Employee> findAll() {
      List<Employee> employees = (List<Employee>) repository.findAll();
      contextBuilder.withEmployees(employees).build(); // (4)
      return employees;
   }

   @DgsData(parentType = "QueryResolver", field = "employee") 
   public Employee findById(@InputArgument("id") Integer id, 
               DataFetchingEnvironment dfe) {
      EmployeeContext employeeContext = DgsContext.getCustomContext(dfe); // (5)
      List<Employee> employees = employeeContext.getEmployees();
      Optional<Employee> employeeOpt = employees.stream()
         .filter(employee -> employee.getId().equals(id)).findFirst();
      return employeeOpt.orElseGet(() -> 
         repository.findById(id)
            .orElseThrow(DgsEntityNotFoundException::new));
   }

   @DgsData(parentType = "QueryResolver", field = "employeesWithFilter")
   public Iterable<Employee> findWithFilter(@InputArgument("filter") EmployeeFilter filter) { // (6)
      Specification<Employee> spec = null;
      if (filter.getSalary() != null)
         spec = bySalary(filter.getSalary());
      if (filter.getAge() != null)
         spec = (spec == null ? byAge(filter.getAge()) : spec.and(byAge(filter.getAge())));
      if (filter.getPosition() != null)
         spec = (spec == null ? byPosition(filter.getPosition()) :
                spec.and(byPosition(filter.getPosition())));
     if (spec != null)  
        return repository.findAll(spec);
     else
        return repository.findAll();
   }

   private Specification<Employee> bySalary(FilterField filterField) {
      return (root, query, builder) -> 
         filterField.generateCriteria(builder, root.get("salary"));
   }

   private Specification<Employee> byAge(FilterField filterField) {
      return (root, query, builder) -> 
         filterField.generateCriteria(builder, root.get("age"));
   }

   private Specification<Employee> byPosition(FilterField filterField) {
      return (root, query, builder) -> 
         filterField.generateCriteria(builder, root.get("position"));
   }
}

Then, we may switch to the DepartmentFetcher class. It shows the example of relationship fetching. We use DataFetchingEnvironment to detect if the input query contains a relationship field (1). In our case, it may be employees or organization. If any of those fields is defined we add the relation to the JOIN statement (2). We implement the same approach for both findById (3) and findAll methods. However, the findById method also uses data stored in the custom context represented by the EmployeeContext bean (4). If the method findAll in EmployeeFetcher has already been invoked, we can fetch employees assigned to the particular department from the context instead of including the relation to the JOIN statement (5).

@DgsComponent
public class DepartmentFetcher {

   private DepartmentRepository repository;

   DepartmentFetcher(DepartmentRepository repository) {
      this.repository = repository;
   }

   @DgsData(parentType = "QueryResolver", field = "departments")
   public Iterable<Department> findAll(DataFetchingEnvironment environment) {
      DataFetchingFieldSelectionSet s = environment.getSelectionSet(); // (1)
      List<Specification<Department>> specifications = new ArrayList<>();
      if (s.contains("employees") && !s.contains("organization")) // (2)
         return repository.findAll(fetchEmployees());
      else if (!s.contains("employees") && s.contains("organization"))
         return repository.findAll(fetchOrganization());
      else if (s.contains("employees") && s.contains("organization"))
         return repository.findAll(fetchEmployees().and(fetchOrganization()));
      else
         return repository.findAll();
   }

   @DgsData(parentType = "QueryResolver", field = "department")
   public Department findById(@InputArgument("id") Integer id, 
               DataFetchingEnvironment environment) { // (3)
      Specification<Department> spec = byId(id);
      DataFetchingFieldSelectionSet selectionSet = environment.getSelectionSet();
      EmployeeContext employeeContext = DgsContext.getCustomContext(environment); // (4)
      Set<Employee> employees = null;
      if (selectionSet.contains("employees")) {
         if (employeeContext.getEmployees().size() == 0) // (5)
            spec = spec.and(fetchEmployees());
         else
            employees = employeeContext.getEmployees().stream()
               .filter(emp -> emp.getDepartment().getId().equals(id))
               .collect(Collectors.toSet());
      }
      if (selectionSet.contains("organization"))
         spec = spec.and(fetchOrganization());
      Department department = repository
         .findOne(spec).orElseThrow(DgsEntityNotFoundException::new);
      if (employees != null)
         department.setEmployees(employees);
      return department;
   }

   private Specification<Department> fetchOrganization() {
      return (root, query, builder) -> {
         Fetch<Department, Organization> f = root.fetch("organization", JoinType.LEFT);
         Join<Department, Organization> join = (Join<Department, Organization>) f;
         return join.getOn();
      };
   }

   private Specification<Department> fetchEmployees() {
      return (root, query, builder) -> {
         Fetch<Department, Employee> f = root.fetch("employees", JoinType.LEFT);
         Join<Department, Employee> join = (Join<Department, Employee>) f;
         return join.getOn();
      };
   }

   private Specification<Department> byId(Integer id) {
      return (root, query, builder) -> builder.equal(root.get("id"), id);
   }
}

In comparison to the data fetchers implementation of mutation handlers is rather simple. We just need to define a single method for adding new entities. Here’s the implementation of DepartmentMutation.

@DgsComponent
public class DepartmentMutation {

   private DepartmentRepository departmentRepository;
   private OrganizationRepository organizationRepository;

   DepartmentMutation(DepartmentRepository departmentRepository, 
               OrganizationRepository organizationRepository) {
      this.departmentRepository = departmentRepository;
      this.organizationRepository = organizationRepository;
   }

   @DgsData(parentType = "MutationResolver", field = "newDepartment")
   public Department newDepartment(DepartmentInput input) {
      Organization organization = organizationRepository
         .findById(departmentInput.getOrganizationId())
         .orElseThrow();
      return departmentRepository
         .save(new Department(null, input.getName(), null, organization));
   }

}

5. Running Spring Boot application and testing Netflix GraphQL support

The last step in our exercise is to run and test the Spring Boot application. It inserts some test data to the H2 database on startup. So, let’s just use the GraphiQL tool to run test queries. It is automatically included in the application by the Netflix DGS library. We may display it by invoking the URL http://localhost:8080/graphiql.

In the first step, we run the GraphQL query responsible for fetching all employees with departments. The method that handles the query also builds a custom context and stores there all existing employees.

spring-boot-graphql-netflix-query

Then, we may run a query responsible for finding a single department by its id. We will fetch both relations one-to-many with Employee and many-to-one with Organization.

While the Organization entity is fetched using the JOIN statement, Employee is taken from the context. Here’s the SQL query generated for our current scenario.

spring-boot-graphql-netflix-query-next

Finally, we can test our filtering feature. Let’s filter employees using salary and age criteria.

Let’s take a look at the SQL query for the recently called method.

Final Thoughts

Netflix DGS seems to be an interesting alternative to other libraries that provide support for GraphQL with Spring Boot. It has been open-sourced some weeks ago, but it is rather a stable solution. I guess that before releasing it publicly, the Netflix team has tested it in the battle. I like its annotation-based programming style and several other features. This article will help you in starting with Netflix DGS.

The post An Advanced GraphQL with Spring Boot and Netflix DGS appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/04/08/an-advanced-graphql-with-spring-boot-and-netflix-dgs/feed/ 20 9639
Spring Boot Tips, Tricks and Techniques https://piotrminkowski.com/2021/01/13/spring-boot-tips-tricks-and-techniques/ https://piotrminkowski.com/2021/01/13/spring-boot-tips-tricks-and-techniques/#comments Wed, 13 Jan 2021 11:10:29 +0000 https://piotrminkowski.com/?p=9354 In this article, I will show you some tips and tricks that help you in building the Spring Boot application efficiently. I hope you will find there tips and techniques that help to boost your productivity in Spring Boot development. Of course, that’s my private list of favorite features. You may find some others by […]

The post Spring Boot Tips, Tricks and Techniques appeared first on Piotr's TechBlog.

]]>
In this article, I will show you some tips and tricks that help you in building the Spring Boot application efficiently. I hope you will find there tips and techniques that help to boost your productivity in Spring Boot development. Of course, that’s my private list of favorite features. You may find some others by yourself, for example on the Spring “How-to” Guides site.

I have already published all these Spring Boot tips on Twitter in a graphical form visible below. You may them using the #SpringBootTip hashtag. I’m a huge fan of Spring Boot. So, if you have suggestions or your own favorite features just ping me on Twitter (@piotr_minkowski). I will definitely retweet your tweet πŸ™‚

spring-boot-tips

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. Then you should execute the command mvn clean package spring-boot:run to build and run the sample application. This application uses embedded database H2 and exposes the REST API. Of course, it demonstrates all the features described in this article. If you have any suggestions, don’t be afraid to create a pull request!

Tip 1. Use a random HTTP port in tests

Let’s begin with some Spring Boot testing tips. You should not use a static port in your Spring Boot tests. In order to set this option for the particular test you need to use the webEnvironment field in @SpringBootTest. So, instead of a default DEFINED_PORT provide the RANDOM_PORT value. Then, you can inject a port number into the test with the @LocalServerPort annotation.

@SpringBootTest(webEnvironment = 
   SpringBootTest.WebEnvironment.RANDOM_PORT)
public class AppTest {

   @LocalServerPort
   private int port;

   @Test
   void test() {
      Assertions.assertTrue(port > 0);
   }
}

Tip 2. Use @DataJpaTest to test the JPA layer

Typically for integration testing, you probably use @SpringBootTest to annotate the test class. The problem with it is that it starts the whole application context. This in turn increases the total time required for running your test. Instead, you may use @DataJpaTest that starts JPA components and @Repository beans. By default, it logs SQL queries. So, a good idea is to disable it with the showSql field. Moreover, if you want to include beans annotated with @Service or @Component to the test, you may use @Import annotation.

@DataJpaTest(showSql = false)
@Import(TipService.class)
public class TipsControllerTest {

    @Autowired
    private TipService tipService;

    @Test
    void testFindAll() {
        List&lt;Tip> tips = tipService.findAll();
        Assertions.assertEquals(3, tips.size());
    }
}

Be careful with changing test annotations, if you have multiple integration tests in your application. Since such change modifies a global state of your application context, it may result in not reusing that context between your tests. You can read more about it in the following article by Philip Riecks.

Tip 3. Rollback transaction after each test

Let’s begin with an embedded, in-memory database. In general, you should rollback all changes performed during each test. The changes during a particular test should not have an influence on the result of another test. However, don’t try to rollback such changes manually! For example, you should not remove a new entity added during the test as shown below.

 public void testAdd() {
     Tip tip = tipRepository.save(new Tip(null, "Tip1", "Desc1"));
     Assertions.assertNotNull(tip);
     tipRepository.deleteById(tip.getId());
 }

Spring Boot comes with a very handy solution for that case. You just need to annotate the test class with @Transactional. Rollback is the default behavior in the test mode, so nothing else is required here. But remember – it works properly only on the client’s side. If your application performs a transaction on the server’s side, it will not be rolled back.

@SpringBootTest
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@Transactional
public class TipsRepositoryTest {

    @Autowired
    private TipRepository tipRepository;

    @Test
    @Order(1)
    public void testAdd() {
        Tip tip = tipRepository.save(new Tip(null, "Tip1", "Desc1"));
        Assertions.assertNotNull(tip);
    }

    @Test
    @Order(2)
    public void testFindAll() {
        Iterable<Tip> tips = tipRepository.findAll();
        Assertions.assertEquals(0, ((List<Tip>) tips).size());
    }
}

In some cases, you won’t use an in-memory, embedded database in your tests. For example, if you have a complex data structure, you may want to check committed data instead of debugging if your tests fail. Therefore you need to use an external database, and commit data after each test. Each time you should start your tests with a cleanup.

Tip 4. Multiple Spring Conditions with logical “OR”

What if you would like to define multiple conditions with @Conditional on a Spring bean? By default, Spring Boot combines all defined conditions with logical “AND”. In the example code visible below, a target bean would be available only if MyBean1 and MyBean2 exist and the property multipleBeans.enabled is defined.

@Bean
@ConditionalOnProperty("multipleBeans.enabled")
@ConditionalOnBean({MyBean1.class, MyBean2.class})
public MyBean myBean() {
   return new MyBean();
}

In order to define multiple “OR” conditions, you need to create a class that extends AnyNestedCondition, and put there all your conditions. Then you should use that class with @Conditional annotation as shown below.

public class MyBeansOrPropertyCondition extends AnyNestedCondition {

    public MyBeansOrPropertyCondition() {
        super(ConfigurationPhase.REGISTER_BEAN);
    }

    @ConditionalOnBean(MyBean1.class)
    static class MyBean1ExistsCondition {}

    @ConditionalOnBean(MyBean2.class)
    static class MyBean2ExistsCondition {}

    @ConditionalOnProperty("multipleBeans.enabled")
    static class MultipleBeansPropertyExists {}
}

@Bean
@Conditional(MyBeansOrPropertyCondition.class)
public MyBean myBean() {
   return new MyBean();
}

Tip 5. Inject Maven data into an application

You may choose between two options that allow injecting Maven data into an application. Firstly, you can use a special placeholder with the project prefix and @ delimiter in the application.properties file.

maven.app=@project.artifactId@:@project.version@

Then, you just need to inject a property into the application using @Value annotation.

@SpringBootApplication
public class TipsApp {
   @Value("${maven.app}")
   private String name;
}

On the other hand, you may use BuildProperties bean as shown below. It stores data available in the build-info.properties file.

@SpringBootApplication
public class TipsApp {

   @Autowired
   private BuildProperties buildProperties;

   @PostConstruct
   void init() {
      log.info("Maven properties: {}, {}", 
	     buildProperties.getArtifact(), 
	     buildProperties.getVersion());
   }
}

In order to generate build-info.properties you execute goal build-info provided by Spring Boot Maven Plugin.

$ mvn package spring-boot:build-info

Tip 6. Inject Git data into an application

Sometimes, you may want to access Git data inside in your Spring Boot application. In order to do that, you first need to include git-commit-id-plugin to the Maven plugins. During the build it generates git.properties file.

<plugin>
   <groupId>pl.project13.maven</groupId>
   <artifactId>git-commit-id-plugin</artifactId>
   <configuration>
      <failOnNoGitDirectory>false</failOnNoGitDirectory>
   </configuration>
</plugin>

Finally, you may inject the content from the git.properties file to the application using GitProperties bean.

@SpringBootApplication
public class TipsApp {

   @Autowired
   private GitProperties gitProperties;

   @PostConstruct
   void init() {
      log.info("Git properties: {}, {}", 
	     gitProperties.getCommitId(), 
	     gitProperties.getCommitTime());
   }
}

Tip 7. Insert initial non-production data

Sometimes, you need to insert some data on the application startup for demo purposes. You can also use such an initial data set to test your application manually during development. In order to achieve it, you just need to put the data.sql file on the classpath. Typically, you will place it somewhere inside src/main/resources directory. Then you easily filter out such a file during a non-dev build.

insert into tip(title, description) values ('Test1', 'Desc1');
insert into tip(title, description) values ('Test2', 'Desc2');
insert into tip(title, description) values ('Test3', 'Desc3');

However, if you need to generate a large data set or you are just not convinced about the solution with data.sql you can insert data programmatically. In that case, it is important to activate the feature only in a specific profile.

@Profile("demo")
@Component
public class ApplicationStartupListener implements 
      ApplicationListener<ApplicationReadyEvent> {

   @Autowired
   private TipRepository repository;

   public void onApplicationEvent(final ApplicationReadyEvent event) {
      repository.save(new Tip("Test1", "Desc1"));
      repository.save(new Tip("Test2", "Desc2"));
      repository.save(new Tip("Test3", "Desc3"));
   }
}

Tip 8. Configuration properties instead of @Value

You should not use @Value for injection if you have multiple properties with the same prefix (e.g. app). Instead, use @ConfigurationProperties with constructor injection. You can mix it with Lombok @AllArgsConstructor and @Getter.

@ConfigurationProperties("app")
@AllArgsConstructor
@Getter
@ToString
public class TipsAppProperties {
    private final String name;
    private final String version;
}
@SpringBootApplication
public class TipsApp {

    @Autowired
    private TipsAppProperties properties;
	
}

Tip 9. Error handling with Spring MVC

Spring MVC Exception Handling is very important to make sure you are not sending server exceptions to the client. Currently, there are two recommended approaches when handling exceptions. In the first of them, you will use a global error handler with @ControllerAdvice and @ExceptionHandler annotations. Obviously, a good practice is to handle all the business exceptions thrown by your application and assign HTTP codes to them. By default, Spring MVC returns HTTP 500 code for an unhandled exception.

@ControllerAdvice
public class TipNotFoundHandler {

    @ResponseStatus(HttpStatus.NO_CONTENT)
    @ExceptionHandler(NoSuchElementException.class)
    public void handleNotFound() {

    }
}

You can also handle every exception locally inside the controller method. In that case, you just need to throw ResponseStatusException with a particular HTTP code.

@GetMapping("/{id}")
public Tip findById(@PathVariable("id") Long id) {
   try {
      return repository.findById(id).orElseThrow();
   } catch (NoSuchElementException e) {
      log.error("Not found", e);
      throw new ResponseStatusException(HttpStatus.NO_CONTENT);
   }
}

Tip 10. Ignore not existing config file

In general, the application should not fail to start if a configuration file does not exist. Especially since you can set default values for the properties. Since the default behavior of the Spring application is to fail to start in case of a missing configuration file, you need to change it. Set the spring.config.on-not-found property to ignore.

$ java -jar target/spring-boot-tips.jar \
   --spring.config.additional-location=classpath:/add.properties \
   --spring.config.on-not-found=ignore

There is another handy solution to avoid startup failure. You can use the optional keyword in the config file location as shown below.

$ java -jar target/spring-boot-tips.jar \
   --spring.config.additional-location=optional:classpath:/add.properties

Tip 11. Different levels of configuration

You can change the default location of the Spring configuration file with the spring.config.location property. The priority of property sources is determined by the order of files in the list. The most significant is in the end. This feature allows you to define different levels of configuration starting from general settings to the most application-specific settings. So, let’s assume we have a global configuration file with the content visible below.

property1=Global property1
property2=Global property2

Also, we have an application-specific configuration file as shown below. It contains the property with the same name as the property in a global configuration file.

property1=App specific property1

And here’s a JUnit test that verifies that feature.

@SpringBootTest(properties = {
    "spring.config.location=classpath:/global.properties,classpath:/app.properties"
})
public class TipsAppTest {

    @Value("${property1}")
    private String property1;
    @Value("${property2}")
    private String property2;
    
    @Test
    void testProperties() {
        Assertions.assertEquals("App specific property1", property1);
        Assertions.assertEquals("Global property2", property2);
    }
}

Tip 12. Deploy Spring Boot on Kubernetes

With the Dekorate project, you don’t have to create any Kubernetes YAML manifests manually. Firstly, you need to include the io.dekorate:kubernetes-spring-starter dependency. Then you can use annotations like @KubernetesApplication to add some new parameters into the generated YAML manifest or override defaults.

@SpringBootApplication
@KubernetesApplication(replicas = 2,
    envVars = { 
       @Env(name = "propertyEnv", 
            value = "Hello from env!"
       ),
       @Env(name = "propertyFromMap", 
            value = "property1", 
            configmap = "sample-configmap"
       ) 
    },
    expose = true,
    ports = @Port(name = "http", containerPort = 8080),
    labels = @Label(key = "version", value = "v1")
)
@JvmOptions(server = true, xmx = 256, gc = GarbageCollector.SerialGC)
public class TipsApp {

    public static void main(String[] args) {
        SpringApplication.run(TipsApp.class, args);
    }

}

After that, you need to set dekorate.build and dekorate.deploy parameters to true in your Maven build command. It automatically generates manifests and deploys the Spring Boot application on Kubernetes. If you use Skaffold for deploying applications on Kubernetes you can easily integrate it with Dekorate. To read more about the details please refer to the following article.

$ mvn clean install -Ddekorate.build =true -Ddekorate.deploy=true

Tip 13. Generate a random HTTP port

Finally, we may proceed to the last of the Spring Boot tips described in this article. Probably you know that feature, but I must mention it here. Spring Boot assigns a random and free port to the web application if you set server.port property to 0.

server.port=0

You can set a random port in a custom predefined range, e.g. 8000-8100. However, there is no guarantee that a generated port will be unassigned.

server.port=${random.int(8000,8100)}

The post Spring Boot Tips, Tricks and Techniques appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/01/13/spring-boot-tips-tricks-and-techniques/feed/ 6 9354
An Advanced Guide to GraphQL with Spring Boot https://piotrminkowski.com/2020/07/31/an-advanced-guide-to-graphql-with-spring-boot/ https://piotrminkowski.com/2020/07/31/an-advanced-guide-to-graphql-with-spring-boot/#comments Fri, 31 Jul 2020 09:31:27 +0000 http://piotrminkowski.com/?p=8220 In this guide I’m going to discuss some more advanced topics related to GraphQL and databases, like filtering or relationship fetching. Of course, before proceeding to the more advanced issues I will take a moment to describe the basics – something you can be found in many other articles. If you already had the opportunity […]

The post An Advanced Guide to GraphQL with Spring Boot appeared first on Piotr's TechBlog.

]]>
In this guide I’m going to discuss some more advanced topics related to GraphQL and databases, like filtering or relationship fetching. Of course, before proceeding to the more advanced issues I will take a moment to describe the basics – something you can be found in many other articles. If you already had the opportunity to familiarize yourself with the concept over GraphQL you may have some questions. Probably one of them is: β€œOk. It’s nice. But what if I would like to use GraphQL in the real application that connects to the database and provides API for more advanced queries?”.
If that is your main question, my current article is definitely for you. If you are thinking about using GraphQL in your microservices architecture you may also refer to my previous article GraphQL – The Future of Microservices?.

Example

As you know it is best to learn from examples, so I have created a sample Spring Boot application that exposes API using GraphQL and connects to H2 in-memory database. We will discuss Spring Boot GraphQL JPA support. For integration with the H2 database I’m using Spring Data JPA and Hibernate. I have implemented three entities Employee, Department and Organization – each of them stored in the separated table. A relationship model between them has been visualized in the picture below.

graphql-spring-boot-relations.png

A source code with sample application is available on GitHub in repository: https://github.com/piomin/sample-spring-boot-graphql.git

1. Dependencies

Let’s start from dependencies. Here’s a list of required dependencies for our application. We need to include projects Spring Web, Spring Data JPA and com.database:h2 artifact for embedding in-memory database to our application. I’m also using Spring Boot library offering support for GraphQL. In fact, you may find some other Spring Boot GraphQL JPA libraries, but the one under group com.graphql-java-kickstart (https://www.graphql-java-kickstart.com/spring-boot/) seems to be actively developed and maintained.


<properties>
   <graphql.spring.version>7.1.0</graphql.spring.version>
</properties>
<dependencies>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-web</artifactId>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-data-jpa</artifactId>
   </dependency>
   <dependency>
      <groupId>com.h2database</groupId>
      <artifactId>h2</artifactId>
      <scope>runtime</scope>
   </dependency>
   <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
   </dependency>
   <dependency>
      <groupId>com.graphql-java-kickstart</groupId>
      <artifactId>graphql-spring-boot-starter</artifactId>
      <version>${graphql.spring.version}</version>
   </dependency>
   <dependency>
      <groupId>com.graphql-java-kickstart</groupId>
      <artifactId>graphiql-spring-boot-starter</artifactId>
      <version>${graphql.spring.version}</version>
   </dependency>
   <dependency>
      <groupId>com.graphql-java-kickstart</groupId>
      <artifactId>graphql-spring-boot-starter-test</artifactId>
      <version>${graphql.spring.version}</version>
      <scope>test</scope>
   </dependency>
   <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
   </dependency>
</dependencies>

2. Schemas

We are starting implementation from defining GraphQL schemas with objects, queries and mutations definitions. The files are located inside /src/main/resources/graphql directory and after adding graphql-spring-boot-starter they are automatically detected by the application basing on their suffix *.graphqls.
GraphQL schema for each entity is located in the separated file. Let’s take a look on department.graphqls. It’s a very trivial definition.

type QueryResolver {
    departments: [Department]
    department(id: ID!): Department!
}

type MutationResolver {
    newDepartment(department: DepartmentInput!): Department
}

input DepartmentInput {
    name: String!
    organizationId: Int
}

type Department {
    id: ID!
    name: String!
    organization: Organization
    employees: [Employee]
}

Here’s the schema inside file organization.graphqls. As you see I’m using keyword extend on QueryResolver and MutationResolver.

extend type QueryResolver {
    organizations: [Organization]
    organization(id: ID!): Organization!
}

extend type MutationResolver {
    newOrganization(organization: OrganizationInput!): Organization
}

input OrganizationInput {
    name: String!
}

type Organization {
    id: ID!
    name: String!
    employees: [Employee]
    departments: [Department]
}

Schema for Employee is a little bit more complicated than two previously demonstrated schemas. I have defined an input object for filtering. It will be discussed in the next section in detail.

extend type QueryResolver {
  employees: [Employee]
  employeesWithFilter(filter: EmployeeFilter): [Employee]
  employee(id: ID!): Employee!
}

extend type MutationResolver {
  newEmployee(employee: EmployeeInput!): Employee
}

input EmployeeInput {
  firstName: String!
  lastName: String!
  position: String!
  salary: Int
  age: Int
  organizationId: Int!
  departmentId: Int!
}

type Employee {
  id: ID!
  firstName: String!
  lastName: String!
  position: String!
  salary: Int
  age: Int
  department: Department
  organization: Organization
}

input EmployeeFilter {
  salary: FilterField
  age: FilterField
  position: FilterField
}

input FilterField {
  operator: String!
  value: String!
}

schema {
  query: QueryResolver
  mutation: MutationResolver
}

3. Domain model

Let’s take a look at the corresponding domain model. Here’s Employee entity. Each Employee is assigned to a single Department and Organization.

@Entity
@Data
@NoArgsConstructor
@AllArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Employee {
   @Id
   @GeneratedValue
   @EqualsAndHashCode.Include
   private Integer id;
   private String firstName;
   private String lastName;
   private String position;
   private int salary;
   private int age;
   @ManyToOne(fetch = FetchType.LAZY)
   private Department department;
   @ManyToOne(fetch = FetchType.LAZY)
   private Organization organization;
}

Here’s Department entity. It contains a list of employees and a reference to a single organization.


@Entity
@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Department {
   @Id
   @GeneratedValue
   @EqualsAndHashCode.Include
   private Integer id;
   private String name;
   @OneToMany(mappedBy = "department")
   private Set<Employee> employees;
   @ManyToOne(fetch = FetchType.LAZY)
   private Organization organization;
}

And finally Organization entity.

@Entity
@Data
@AllArgsConstructor
@NoArgsConstructor
@EqualsAndHashCode(onlyExplicitlyIncluded = true)
public class Organization {
   @Id
   @GeneratedValue
@EqualsAndHashCode.Include
   private Integer id;
   private String name;
   @OneToMany(mappedBy = "organization")
   private Set<Department> departments;
   @OneToMany(mappedBy = "organization")
   private Set<Employee> employees;
}

Entity classes are returned as a result by queries. In mutations we are using input objects that have slightly different implementation. They do not contain reference to a relationship, but only an id of related objects.

@Data
@AllArgsConstructor
@NoArgsConstructor
public class DepartmentInput {
   private String name;
   private Integer organizationId;
}

4. Fetch relations

As you probably figured out, all the JPA relations are configured in lazy mode. To fetch them we should explicitly set such a request in our GraphQL query. For example, we may query all departments and fetch organization to each of the department returned on the list.


{
  departments {
    id
    name
    organization {
      id
      name
    }
  }
}

Now, the question is how to handle it on the server side. The first thing we need to do is to detect the existence of such a relationship field in our GraphQL query. Why? Because we need to avoid possible N+1 problem, which happens when the data access framework executes N additional SQL statements to fetch the same data that could have been retrieved when executing the primary SQL query. So, we need to prepare different JPA queries depending on the parameters set in the GraphQL query. We may do it in several ways, but the most convenient way is by using DataFetchingEnvironment parameter inside QueryResolver implementation.
Let’s take a look on the implementation of QueryResolver for Department. If we annotate class that implements GraphQLQueryResolver with @Component it is automatically detected by Spring Boot (thanks to graphql-spring-boot-starter). Then we are adding DataFetchingEnvironment as a parameter to each query. After that we should invoke method getSelectionSet() on DataFetchingEnvironment object and check if it contains word organization (for fetching Organization) or employees (for fetching list of employees). Depending on requested relations we build different queries. In the following fragment of code we have two methods implemented for DepartmentQueryResolver: findAll and findById.

@Component
public class DepartmentQueryResolver implements GraphQLQueryResolver {

   private DepartmentRepository repository;

   DepartmentQueryResolver(DepartmentRepository repository) {
      this.repository = repository;
   }

   public Iterable<Department> departments(DataFetchingEnvironment environment) {
      DataFetchingFieldSelectionSet s = environment.getSelectionSet();
      List<Specification<Department>> specifications = new ArrayList<>();
      if (s.contains("employees") && !s.contains("organization"))
         return repository.findAll(fetchEmployees());
      else if (!s.contains("employees") && s.contains("organization"))
         return repository.findAll(fetchOrganization());
      else if (s.contains("employees") && s.contains("organization"))
         return repository.findAll(fetchEmployees().and(fetchOrganization()));
      else
         return repository.findAll();
   }

   public Department department(Integer id, DataFetchingEnvironment environment) {
      Specification<Department> spec = byId(id);
      DataFetchingFieldSelectionSet selectionSet = environment.getSelectionSet();
      if (selectionSet.contains("employees"))
         spec = spec.and(fetchEmployees());
      if (selectionSet.contains("organization"))
         spec = spec.and(fetchOrganization());
      return repository.findOne(spec).orElseThrow(NoSuchElementException::new);
   }
   
   // REST OF IMPLEMENTATION ...
}

The most convenient way to build dynamic queries is by using JPA Criteria API. To be able to use it with Spring Data JPA we first need to extend our repository interface with JpaSpecificationExecutor interface. After that you may use the additional interface methods that let you execute specifications in a variety of ways. You may choose between findAll and findOne methods.

public interface DepartmentRepository extends CrudRepository<Department, Integer>,
      JpaSpecificationExecutor<Department> {

}

Finally, we may just prepare methods that build Specification the object. This object contains a predicate. In that case we are using three predicates for fetching organization, employees and filtering by id.

private Specification<Department> fetchOrganization() {
   return (Specification<Department>) (root, query, builder) -> {
      Fetch<Department, Organization> f = root.fetch("organization", JoinType.LEFT);
      Join<Department, Organization> join = (Join<Department, Organization>) f;
      return join.getOn();
   };
}

private Specification<Department> fetchEmployees() {
   return (Specification<Department>) (root, query, builder) -> {
      Fetch<Department, Employee> f = root.fetch("employees", JoinType.LEFT);
      Join<Department, Employee> join = (Join<Department, Employee>) f;
      return join.getOn();
   };
}

private Specification<Department> byId(Integer id) {
   return (Specification<Department>) (root, query, builder) -> builder.equal(root.get("id"), id);
}

5. Filtering

For a start, let’s refer to the section 2 – Schemas. Inside employee.graphqls I defined two additional inputs FilterField and EmployeeFilter, and also a single method employeesWithFilter that takes EmployeeFilter as an argument. The FieldFilter class is my custom implementation of a filter for GraphQL queries. It is very trivial. It provides an implementation of two filter types: for number or for string. It generates JPA Criteria Predicate. Of course, instead creating such filter implementation by yourself (like me), you may leverage some existing libraries for that. However, it does not require much time to do it by yourself as you see in the following code. Our custom filter implementation has two parameters: operator and value.

@Data
public class FilterField {
   private String operator;
   private String value;

   public Predicate generateCriteria(CriteriaBuilder builder, Path field) {
      try {
         int v = Integer.parseInt(value);
         switch (operator) {
         case "lt": return builder.lt(field, v);
         case "le": return builder.le(field, v);
         case "gt": return builder.gt(field, v);
         case "ge": return builder.ge(field, v);
         case "eq": return builder.equal(field, v);
         }
      } catch (NumberFormatException e) {
         switch (operator) {
         case "endsWith": return builder.like(field, "%" + value);
         case "startsWith": return builder.like(field, value + "%");
         case "contains": return builder.like(field, "%" + value + "%");
         case "eq": return builder.equal(field, value);
         }
      }

      return null;
   }
}

Now, with FilterField we may create a concrete implementation of filters consisting of several simple FilterField. The example of such implementation is EmployeeFilter class that has three possible criterias of filtering by salary, age and position.

@Data
public class EmployeeFilter {
   private FilterField salary;
   private FilterField age;
   private FilterField position;
}

Now if you would like to use that filter in your GraphQL query you should create something like that. In that query we are searching for all developers that has a salary greater than 12000 and age greater than 30 years.

{
  employeesWithFilter(filter: {
    salary: {
      operator: "gt"
      value: "12000"
    },
    age: {
      operator: "gt"
      value: "30"
    },
    position: {
      operator: "eq",
      value: "Developer"
    }
  }) {
    id
    firstName
    lastName
    position
  }
}

Let’s take a look at the implementation of query resolver. The same as for fetching relations we are using JPA Criteria API and Specification class. I have three methods that creates Specification for each of possible filter fields. Then I’m building dynamically filtering criterias based on the content of EmployeeFilter.

@Component
public class EmployeeQueryResolver implements GraphQLQueryResolver {

   private EmployeeRepository repository;

   EmployeeQueryResolver(EmployeeRepository repository) {
      this.repository = repository;
   }

   // OTHER FIND METHODS ...
   
   public Iterable<Employee&qt; employeesWithFilter(EmployeeFilter filter) {
      Specification<Employee&qt; spec = null;
      if (filter.getSalary() != null)
         spec = bySalary(filter.getSalary());
      if (filter.getAge() != null)
         spec = (spec == null ? byAge(filter.getAge()) : spec.and(byAge(filter.getAge())));
      if (filter.getPosition() != null)
         spec = (spec == null ? byPosition(filter.getPosition()) :
               spec.and(byPosition(filter.getPosition())));
      if (spec != null)
         return repository.findAll(spec);
      else
         return repository.findAll();
   }

   private Specification<Employee&qt; bySalary(FilterField filterField) {
      return (Specification<Employee&qt;) (root, query, builder) -&qt; filterField.generateCriteria(builder, root.get("salary"));
   }

   private Specification<Employee&qt; byAge(FilterField filterField) {
      return (Specification<Employee&qt;) (root, query, builder) -&qt; filterField.generateCriteria(builder, root.get("age"));
   }

   private Specification<Employee&qt; byPosition(FilterField filterField) {
      return (Specification<Employee&qt;) (root, query, builder) -&qt; filterField.generateCriteria(builder, root.get("position"));
   }
}

6. Testing Spring Boot GraphQL JPA support

We will insert some test data into the H2 database by defining data.sql inside src/main/resources directory.

insert into organization (id, name) values (1, 'Test1');
insert into organization (id, name) values (2, 'Test2');
insert into organization (id, name) values (3, 'Test3');
insert into organization (id, name) values (4, 'Test4');
insert into organization (id, name) values (5, 'Test5');
insert into department (id, name, organization_id) values (1, 'Test1', 1);
insert into department (id, name, organization_id) values (2, 'Test2', 1);
insert into department (id, name, organization_id) values (3, 'Test3', 1);
insert into department (id, name, organization_id) values (4, 'Test4', 2);
insert into department (id, name, organization_id) values (5, 'Test5', 2);
insert into department (id, name, organization_id) values (6, 'Test6', 3);
insert into department (id, name, organization_id) values (7, 'Test7', 4);
insert into department (id, name, organization_id) values (8, 'Test8', 5);
insert into department (id, name, organization_id) values (9, 'Test9', 5);
insert into employee (id, first_name, last_name, position, salary, age, department_id, organization_id) values (1, 'John', 'Smith', 'Developer', 10000, 30, 1, 1);
insert into employee (id, first_name, last_name, position, salary, age, department_id, organization_id) values (2, 'Adam', 'Hamilton', 'Developer', 12000, 35, 1, 1);
insert into employee (id, first_name, last_name, position, salary, age, department_id, organization_id) values (3, 'Tracy', 'Smith', 'Architect', 15000, 40, 1, 1);
insert into employee (id, first_name, last_name, position, salary, age, department_id, organization_id) values (4, 'Lucy', 'Kim', 'Developer', 13000, 25, 2, 1);
insert into employee (id, first_name, last_name, position, salary, age, department_id, organization_id) values (5, 'Peter', 'Wright', 'Director', 50000, 50, 4, 2);
insert into employee (id, first_name, last_name, position, salary, age, department_id, organization_id) values (6, 'Alan', 'Murray', 'Developer', 20000, 37, 4, 2);
insert into employee (id, first_name, last_name, position, salary, age, department_id, organization_id) values (7, 'Pamela', 'Anderson', 'Analyst', 7000, 27, 4, 2);

Now, we can easily perform some test queries by using GraphiQL that is embedded into our application and available under address http://localhost:8080/graphiql after startup. First, let’s verify the filtering query.

graphql-spring-boot-query-1

Now, we may test fetching by searching Department by id and fetching a list of employees and organization.

graphql-spring-boot-query-2

The post An Advanced Guide to GraphQL with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/07/31/an-advanced-guide-to-graphql-with-spring-boot/feed/ 6 8220
Distributed Transactions in Microservices with Spring Boot https://piotrminkowski.com/2020/06/19/distributed-transactions-in-microservices-with-spring-boot/ https://piotrminkowski.com/2020/06/19/distributed-transactions-in-microservices-with-spring-boot/#comments Fri, 19 Jun 2020 10:13:34 +0000 http://piotrminkowski.com/?p=8144 When I’m talking about microservices with other people they are often asking me about an approach to distributed transactions. My advice is always the same – try to completely avoid distributed transactions in your microservices architecture. It is a very complex process with a lot of moving parts that can fail. That’s why it does […]

The post Distributed Transactions in Microservices with Spring Boot appeared first on Piotr's TechBlog.

]]>
When I’m talking about microservices with other people they are often asking me about an approach to distributed transactions. My advice is always the same – try to completely avoid distributed transactions in your microservices architecture. It is a very complex process with a lot of moving parts that can fail. That’s why it does not fit the nature of microservices-based systems.

However, if for any reason you require to use distributed transactions, there are two popular approaches for that: Two Phase Commit Protocol and Eventual Consistency and Compensation also known as Saga pattern. You can read some interesting articles about it online. Most of them are discussing theoretical aspects related two those approaches, so in this article, I’m going to present the sample implementation in Spring Boot. It is worth mentioning that there are some ready implementations of Saga pattern like support for complex business transactions provided by Axon Framework. The documentation of this solution is available here: https://docs.axoniq.io/reference-guide/implementing-domain-logic/complex-business-transactions.

Example

The source code with sample applications is as usual available on GitHub in the repository: https://github.com/piomin/sample-spring-microservices-transactions.git.

Architecture

First, we need to add a new component to our system. It is responsible just for managing distributed transactions across microservices. That element is described as transaction-server on the diagram below. We also use another popular component in microservices-based architecture discovery-server. There are three applications: order-service, account-service and product-service. The application order-service is communicating with account-service and product-service. All these applications are using Postgres database as a backend store. Just for simplification I have run a single database with multiple tables. In a normal situation we would have a single database per each microservice.

spring-microservice-transactions-arch1

Now, we will consider the following situation (it is visualized on the diagram below). The application order-service is creating an order, storing it in the database, and then starting a new distributed transaction (1). After that, it is communicating with application product-service to update the current number of stored products and get their price (2). At the same time product-service is sending information to transaction-server that it is participating in the transaction (3). Then order-service is trying to withdraw the required funds from the customer account and transfer them into another account related to a seller (4). Finally, we are rolling back the transaction by throwing an exception inside the transaction method from order-service (6). This rollback should cause a rollback of the whole distributed transaction.

spring-microservices-transactions-arch2 (1)

Building transaction server

We are starting implementation from transaction-server. A transaction server is responsible for managing distributed transactions across all microservices in our sample system. It exposes REST API available for all other microservices for adding new transactions and updating their status. It also sends asynchronous broadcast events after receiving transaction confirmation or rollback from a source microservice. It uses RabbitMQ message broker for sending events to other microservices via topic exchange. All other microservices are listening for incoming events, and after receiving them they are committing or rolling back transactions. We can avoid using a message broker for exchanging events and use communication over HTTP endpoints, but that makes sense only if we have a single instance of every microservice. Here’s the picture that illustrates the currently described architecture.

spring-microservice-transactions-server (1)

Let’s take a look on the list of required dependencies. It would be pretty the same for other applications. We need spring-boot-starter-amqp for integration with RabbitMQ, spring-boot-starter-web for exposing REST API over HTTP, spring-cloud-starter-netflix-eureka-client for integration with Eureka discovery server and some basic Kotlin libraries.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.module</groupId>
   <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-stdlib</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

In the main class we are defining a topic exchange for events sent to microservices. The name of exchange is trx-events, and it is automatically created on RabbitMQ after application startup.

@SpringBootApplication
class TransactionServerApp {

    @Bean
    fun topic(): TopicExchange = TopicExchange("trx-events")
}

fun main(args: Array) {
    runApplication(*args)
}

Here are domain model classes used by a transaction server. The same classes are used by the microservices during communication with transaction-server.

data class DistributedTransaction(var id: String? = null,var status: DistributedTransactionStatus,
                                  val participants: MutableList<DistributedTransactionParticipant> = mutableListOf())
                          
class DistributedTransactionParticipant(val serviceId: String, var status: DistributedTransactionStatus)

enum class DistributedTransactionStatus {
    NEW, CONFIRMED, ROLLBACK, TO_ROLLBACK
}   

Here’s the controller class. It is using a simple in-memory implementation of repository and RabbitTemplate for sending events to RabbitMQ. The HTTP API provides methods for adding new transaction, finishing existing transaction with a given status (CONFIRM or ROLLBACK), searching transaction by id and adding participants (new services) into a transaction.

@RestController
@RequestMapping("/transactions")
class TransactionController(val repository: TransactionRepository,
                            val template: RabbitTemplate) {

    @PostMapping
    fun add(@RequestBody transaction: DistributedTransaction): DistributedTransaction =
            repository.save(transaction)

    @GetMapping("/{id}")
    fun findById(@PathVariable id: String): DistributedTransaction? = repository.findById(id)

    @PutMapping("/{id}/finish/{status}")
    fun finish(@PathVariable id: String, @PathVariable status: DistributedTransactionStatus) {
        val transaction: DistributedTransaction? = repository.findById(id)
        if (transaction != null) {
            transaction.status = status
            repository.update(transaction)
            template.convertAndSend("trx-events", DistributedTransaction(id, status))
        }
    }

    @PutMapping("/{id}/participants")
    fun addParticipant(@PathVariable id: String,
                       @RequestBody participant: DistributedTransactionParticipant) =
            repository.findById(id)?.participants?.add(participant)

    @PutMapping("/{id}/participants/{serviceId}/status/{status}")
    fun updateParticipant(@PathVariable id: String,
                          @PathVariable serviceId: String,
                          @PathVariable status: DistributedTransactionStatus) {
        val transaction: DistributedTransaction? = repository.findById(id)
        if (transaction != null) {
            val index = transaction.participants.indexOfFirst { it.serviceId == serviceId }
            if (index != -1) {
                transaction.participants[index].status = status
                template.convertAndSend("trx-events", DistributedTransaction(id, status))
            }
        }
    }

}   

Handling transactions in downstream services

Let’s analyze how our microservices are handling transactions on the example of account. Here’s the implementation of AccountService that is called by the controller for transfering funds from/to account. All methods here are @Transactional and here we need an attention – @Async. It means that each method is running in a new thread and is processing asynchronously. Why? That’s a key concept here. We will block the transaction in order to wait for confirmation from transaction-server, but the main thread used by the controller will not be blocked. It returns the response with the current state of Account immediately.

@Service
@Transactional
@Async
class AccountService(val repository: AccountRepository,
                     var applicationEventPublisher: ApplicationEventPublisher) {
    
    fun payment(id: Int, amount: Int, transactionId: String) =
            transfer(id, amount, transactionId)

    fun withdrawal(id: Int, amount: Int, transactionId: String) =
            transfer(id, (-1) * amount, transactionId)

    private fun transfer(id: Int, amount: Int, transactionId: String) {
        val accountOpt: Optional<Account> = repository.findById(id)
        if (accountOpt.isPresent) {
            val account: Account = accountOpt.get()
            account.balance += amount
            applicationEventPublisher.publishEvent(AccountTransactionEvent(transactionId, account))
            repository.save(account)
        }
    }

}

Here’s the implementation of @Controller class. As you see it is calling methods from AccountService, that are being processed asynchronously. The returned Account object is taken from EventBus bean. This bean is responsible for exchanging asynchronous events within the application scope. En event is sent by the AccountTransactionListener bean responsible for handling Spring transaction events.

@RestController
@RequestMapping("/accounts")
class AccountController(val repository: AccountRepository,
                        val service: AccountService,
                        val eventBus: EventBus) {

    @PostMapping
    fun add(@RequestBody account: Account): Account = repository.save(account)

    @GetMapping("/customer/{customerId}")
    fun findByCustomerId(@PathVariable customerId: Int): List<Account> =
            repository.findByCustomerId(customerId)

    @PutMapping("/{id}/payment/{amount}")
    fun payment(@PathVariable id: Int, @PathVariable amount: Int,
                @RequestHeader("X-Transaction-ID") transactionId: String): Account {
        service.payment(id, amount, transactionId)
        return eventBus.receiveEvent(transactionId)!!.account
    }

    @PutMapping("/{id}/withdrawal/{amount}")
    fun withdrawal(@PathVariable id: Int, @PathVariable amount: Int,
                   @RequestHeader("X-Transaction-ID") transactionId: String): Account {
        service.withdrawal(id, amount, transactionId)
        return eventBus.receiveEvent(transactionId)!!.account
    }

}

The event object exchanged between bean is very simple. It contains an id of transaction and the current Account object.


class AccountTransactionEvent(val transactionId: String, val account: Account)

Finally, let’s take a look at the implementation of AccountTransactionListener bean responsible for handling transactional events. We are using Spring @TransactionalEventListener for annotating methods that should handle incoming events. There are 4 possible event types to handle: BEFORE_COMMIT, AFTER_COMMIT, AFTER_ROLLBACK and AFTER_COMPLETION. There is one very important thing in @TransactionalEventListener, which may be not very intuitive. It is being processed in the same thread as the transaction. So if you would do something that should not block the thread with transaction you should annotate it with @Async. However, in our case this behaviour is required, since we need to block a transactional thread until we receive a confirmation or rollback from transaction-server for a given transaction. These events are sent by transaction-server through RabbitMQ, and they are also exchanged between beans using EventBus. If the status of the received event is different than CONFIRMED we are throwing the exception to rollback transaction.
The AccountTransactionListener is also listening on AFTER_ROLLBACK and AFTER_COMPLETION. After receiving such an event type it is changing the status of the transaction by calling endpoint exposed by transaction-server.

@Component
class AccountTransactionListener(val restTemplate: RestTemplate,
                                 val eventBus: EventBus) {

    @TransactionalEventListener(phase = TransactionPhase.BEFORE_COMMIT)
    @Throws(AccountProcessingException::class)
    fun handleEvent(event: AccountTransactionEvent) {
        eventBus.sendEvent(event)
        var transaction: DistributedTransaction? = null
        for (x in 0..100) {
            transaction = eventBus.receiveTransaction(event.transactionId)
            if (transaction == null)
                Thread.sleep(100)
            else break
        }
        if (transaction == null || transaction.status != DistributedTransactionStatus.CONFIRMED)
            throw AccountProcessingException()
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_ROLLBACK)
    fun handleAfterRollback(event: AccountTransactionEvent) {
        restTemplate.put("http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}",
                null, "account-service", "TO_ROLLBACK")
    }

    @TransactionalEventListener(phase = TransactionPhase.AFTER_COMPLETION)
    fun handleAfterCompletion(event: AccountTransactionEvent) {
        restTemplate.put("http://transaction-server/transactions/transactionId/participants/{serviceId}/status/{status}",
                null, "account-service", "CONFIRM")
    }
    
}

Here’s the implementation of the bean responsible for receiving asynchronous events from a message broker. As you see after receiving such an event it is using EventBus to forward that event to other beans.

@Component
class DistributedTransactionEventListener(val eventBus: EventBus) {

    @RabbitListener(bindings = [
        QueueBinding(exchange = Exchange(type = ExchangeTypes.TOPIC, name = "trx-events"),
                value = Queue("trx-events-account"))
    ])
    fun onMessage(transaction: DistributedTransaction) {
        eventBus.sendTransaction(transaction)
    }

}

Integration with database

Of course our application is using Postgres as a backend store, so we need to provide integration. In fact, that is the simplest step of our implementation. First we need to add the following 2 dependencies. We will use Spring Data JPA for integration with Postgres.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
   <groupId>org.postgresql</groupId>
   <artifactId>postgresql</artifactId>
   <scope>runtime</scope>
</dependency>

Our entity is very simple. Besides the id field it contains two fields: customerId and balance.


@Entity
data class Account(@Id @GeneratedValue(strategy = GenerationType.AUTO) val id: Int,
                   val customerId: Int,
                   var balance: Int)

We are using the well-known Spring Data repository pattern.

interface AccountRepository: CrudRepository<Account, Int> {

    fun findByCustomerId(id: Int): List<Account>

}

Here’s the suggested list of configuration settings.

spring:
  application:
    name: account-service
  datasource:
    url: jdbc:postgresql://postgresql:5432/trx
    username: trx
    password: trx
    hikari:
      connection-timeout: 2000
      initialization-fail-timeout: 0
  jpa:
    database-platform: org.hibernate.dialect.PostgreSQLDialect
    hibernate:
      ddl-auto: create
    show-sql: true
    properties:
      hibernate:
        format_sql: true
  rabbitmq:
    host: rabbitmq
    port: 5672
    connection-timeout: 2000

Building order-service

Ok, we have already finished the implementation of transaction-server, and two microservices account-service and product-service. Since the implementation of product-service is very similar to account-service, I have explained everything on the example of account-service. Now, we may proceed to the last part – the implementation of order-service. It is responsible for starting a new transaction and marking it as finished. It also may finish it with rollback.Of course, rollback events may be sent by another two applications as well.
The implementation of @Controller class is visible below. I’ll describe it step by step. We are starting a new distributed transaction by calling POST /transactions endpoint exposed by transaction-server (1). Then we are storing a new order in database (2). When we are calling a transactional method from downstream service we need to set HTTP header X-Transaction-ID. The first transactional method that is called here is PUT /products/{id}/count/{count}(3). It updates the number of products in the store and calculates a final price (4). In the step it is calling another transaction method – this time from account-service (5). It is responsible for withdrawing money from customer accounts. We are enabling Spring transaction events processing (6). In the last step we are generating a random number, and then basing on its value application is throwing an exception to rollback transaction (7).

@RestController
@RequestMapping("/orders")
class OrderController(val repository: OrderRepository,
                      val restTemplate: RestTemplate,
          var applicationEventPublisher: ApplicationEventPublisher) {

    @PostMapping
    @Transactional
    @Throws(OrderProcessingException::class)
    fun addAndRollback(@RequestBody order: Order) {
        var transaction  = restTemplate.postForObject("http://transaction-server/transactions",
                DistributedTransaction(), DistributedTransaction::class.java) // (1)
        val orderSaved = repository.save(order) // (2)
        val product = updateProduct(transaction!!.id!!, order) // (3)
        val totalPrice = product.price * product.count // (4)
        val accounts = restTemplate.getForObject("http://account-service/accounts/customer/{customerId}",
                Array<Account>::class.java, order.customerId)
        val account  = accounts!!.first { it.balance >= totalPrice}
        updateAccount(transaction.id!!, account.id, totalPrice) // (5)
        applicationEventPublisher.publishEvent(OrderTransactionEvent(transaction.id!!)) // (6)
        val r = Random.nextInt(100) // (7)
        if (r % 2 == 0)
            throw OrderProcessingException()
    }

    fun updateProduct(transactionId: String, order: Order): Product {
        val headers = HttpHeaders()
        headers.set("X-Transaction-ID", transactionId)
        val entity: HttpEntity<*> = HttpEntity<Any?>(headers)
        val product = restTemplate.exchange("http://product-service/products/{id}/count/{count}",
                HttpMethod.PUT, null, Product::class.java, order.id, order.count)
        return product.body!!
    }

    fun updateAccount(transactionId: String, accountId: Int, totalPrice: Int): Account {
        val headers = HttpHeaders()
        headers.set("X-Transaction-ID", transactionId)
        val entity: HttpEntity<*> = HttpEntity<Any?>(headers)
        val account = restTemplate.exchange("http://account-service/accounts/{id}/withdrawal/{amount}",
                HttpMethod.PUT, null, Account::class.java, accountId, totalPrice)
        return account.body!!
    }
}

Conclusion

Even a trivial implementation of distributed transactions in microservices, like the one, demonstrated in this article, can be complicated. As you see we need to add a new element to our architecture, transaction-server, responsible only for distributed transaction management. We also have to add a message broker in order to exchange events between our applications and transaction-server. However, many of you were asking me about distributed transactions in the microservices world, so I decided to build that simple demo. I’m waiting for your feedback and opinions.

The post Distributed Transactions in Microservices with Spring Boot appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2020/06/19/distributed-transactions-in-microservices-with-spring-boot/feed/ 19 8144