streams Archives - Piotr's TechBlog https://piotrminkowski.com/tag/streams/ Java, Spring, Kotlin, microservices, Kubernetes, containers Wed, 24 Nov 2021 16:42:39 +0000 en-US hourly 1 https://wordpress.org/?v=6.9.1 https://i0.wp.com/piotrminkowski.com/wp-content/uploads/2020/08/cropped-me-2-tr-x-1.png?fit=32%2C32&ssl=1 streams Archives - Piotr's TechBlog https://piotrminkowski.com/tag/streams/ 32 32 181738725 Kafka Streams with Quarkus https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/ https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/#comments Wed, 24 Nov 2021 08:24:53 +0000 https://piotrminkowski.com/?p=10234 In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming […]

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
In this article, you will learn how to use Kafka Streams with Quarkus. The same as in my previous article we will create a simple application that simulates the stock market. But this time, we are going to use Quarkus instead of Spring Cloud. If you would like to figure out what is a streaming platform and how it differs from a traditional message broker this article is for you. Moreover, we will study useful improvements related to Apache Kafka provided by Quarkus.

Source Code

If you would like to try it by yourself, you may always take a look at my source code. In order to do that you need to clone my GitHub repository. After that, you should just follow my instructions. Let’s begin.

Architecture

In our case, there are two incoming streams of events. Both of them represent incoming orders. These orders are generated by the order-service application. It sends buy orders to the orders.buy topic and sell orders to the orders.sell topic. Then, the stock-service application receives and handles incoming events. In the first step, it needs to change the key of each message from the orderId to the productId. That’s because it has to join orders from different topics related to the same product in order to execute transactions. Finally, the transaction price is an average of sale and buy prices.

quarkus-kafka-streams-arch

We are building a simplified version of the stock market platform. Each buy order contains a maximum price at which a customer is expecting to buy a product. On the other hand, each sale order contains a minimum price a customer is ready to sell his product. If the sell order price is not greater than a buy order price for a particular product we are performing a transaction.

Each order is valid for 10 seconds. After that time the stock-service application will not handle such an order since it is considered as expired. Each order contains a number of products for a transaction. For example, we may sell 100 for 10 or buy 200 for 11. Therefore, an order may be fully or partially realized. The stock-service application tries to join partially realized orders to other new or partially realized orders. You can see the visualization of that process in the picture below.

quarkus-kafka-streams-app

Run Apache Kafka locally

Before we jump to the implementation, we need to run a local instance of Apache Kafka. If you don’t want to install it on your laptop, the best way to run it is with Redpanda. Redpanda is a Kafka API compatible streaming platform. In comparison to Kafka, it is relatively easy to run it locally. Normally, you would have to install Redpanda on your laptop and then create a cluster using their CLI. But with Quarkus you don’t need to do that! The only requirement is to have Docker installed. Thanks to the Quarkus Kafka extension and feature called Dev Services it automatically starts a Kafka broker in dev mode and when running tests. Moreover, the application is configured automatically.

The only thing you need to do in order to enable that feature is NOT to provide any Kafka address in configuration properties. Dev Services uses Testcontainers to run Kafka, so if you have Docker or any other environment supporting Testcontainers running you get a containerized instance of Kafka out-of-the-box. Another important thing. Firstly, start the order-service application. It automatically creates all the required topics in Kafka. Then run the stock-service application. It uses the Quarkus Kafka Streams extension and verifies if the required topics exist. Let’s visualize it.

quarkus-kafka-streams-run

Send events to Kafka with Quarkus

There are several ways to send events to Kafka with Quarkus. Because we need to send key/value pair we will use the io.smallrye.reactive.messaging.kafka.Record object for that. Quarkus is able to generate and send data continuously. In the fragment of code visible below, we send a single Order event per 500 ms. Each Order contains a random productId, price and productCount.

@Outgoing("orders-buy")
public Multi<Record<Long, Order>> buyOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.BUY,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

@Outgoing("orders-sell")
public Multi<Record<Long, Order>> sellOrdersGenerator() {
   return Multi.createFrom().ticks().every(Duration.ofMillis(500))
      .map(order -> {
         Integer productId = random.nextInt(10) + 1;
         int price = prices.get(productId) + random.nextInt(200);
         Order o = new Order(
             incrementOrderId(),
             random.nextInt(1000) + 1,
             productId,
             100 * (random.nextInt(5) + 1),
             LocalDateTime.now(),
             OrderType.SELL,
             price);
         log.infof("Sent: %s", o);
         return Record.of(o.getId(), o);
   });
}

We will also define a single @Incoming channel in order to receive transactions produced by the stock-service. Thanks to that Quarkus will automatically create the topic transactions used by Quarkus Kafka Streams in stock-service. To be honest, I was not able to force the Quarkus Kafka Streams extension to create the topic automatically. It seems we need to use the SmallRye Reactive Messaging extension for that.

@Incoming("transactions")
public void transactions(Transaction transaction) {
   log.infof("New: %s", transaction);
}

Of course, we need to include the SmallRye Reactive Messaging dependency to the Maven pom.xml.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-reactive-messaging-kafka</artifactId>
</dependency>

Finally, let’s provide configuration settings. We have two outgoing topics and a single incoming topic. We can set their names. Otherwise, Quarkus uses the same name as the name of the channel. The names of our topics are orders.buy, order.sell and transactions.

mp.messaging.outgoing.orders-buy.connector = smallrye-kafka
mp.messaging.outgoing.orders-buy.topic = orders.buy
mp.messaging.outgoing.orders-buy.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-buy.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.outgoing.orders-sell.connector = smallrye-kafka
mp.messaging.outgoing.orders-sell.topic = orders.sell
mp.messaging.outgoing.orders-sell.key.serializer = org.apache.kafka.common.serialization.LongSerializer
mp.messaging.outgoing.orders-sell.value.serializer = io.quarkus.kafka.client.serialization.ObjectMapperSerializer

mp.messaging.incoming.transactions.connector = smallrye-kafka
mp.messaging.incoming.transactions.topic = transactions
mp.messaging.incoming.transactions.value.deserializer = pl.piomin.samples.streams.order.model.deserializer.TransactionDeserializer

That’s all. Our orders generator is ready. If you the order-service application Quarkus will also run Kafka (Redpanda) instance. But first, let’s switch to the second sample application – stock-service.

Consume Kafka Streams with Quarkus

In the previous section, we were sending messages to the Kafka broker. Therefore, we used a standard Quarkus library for integration with Kafka based on the SmallRye Reactive Messaging framework. The stock-service application consumes messages as streams, so now we will use a module for Kafka Streams integration.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-kafka-streams</artifactId>
</dependency>

Our application also uses a database, an ORM layer and includes some other useful modules.

<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-hibernate-orm-panache</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-jdbc-h2</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-openapi</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-resteasy-jackson</artifactId>
</dependency>
<dependency>
   <groupId>io.quarkus</groupId>
   <artifactId>quarkus-smallrye-health</artifactId>
</dependency>

In the first step, we are going to merge both streams of orders (buy and sell), insert the Order into the database, and print the event message. You could ask – why I use the database and ORM layer here since I have Kafka KTable? Well, I need transactions with lock support in order to coordinate the status of order realization (refer to the description in the introduction – fully and partially realized orders). I will give you more details about it in the next sections.

In order to process streams with Quarkus, we need to declare the org.apache.kafka.streams.Topology bean. It contains all the KStream and KTable definitions. Let’s start just with the part responsible for creating and emitting transactions from incoming orders. There are two KStream definitions created. The first of them is responsible for merging two order streams into a single one and then inserting a new Order into a database. The second of them creates and executes transactions by joining two streams using the productId key. But more about it in the next section.

@Produces
public Topology buildTopology() {
   ObjectMapperSerde<Order> orderSerde = 
      new ObjectMapperSerde<>(Order.class);
   ObjectMapperSerde<Transaction> transactionSerde = 
      new ObjectMapperSerde<>(Transaction.class);

   StreamsBuilder builder = new StreamsBuilder();

   KStream<Long, Order> orders = builder.stream(
      ORDERS_SELL_TOPIC,
      Consumed.with(Serdes.Long(), orderSerde));

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .merge(orders)
      .peek((k, v) -> {
         log.infof("New: %s", v);
         logic.add(v);
      });

   builder.stream(ORDERS_BUY_TOPIC, 
         Consumed.with(Serdes.Long(), orderSerde))
      .selectKey((k, v) -> v.getProductId())
      .join(orders.selectKey((k, v) -> v.getProductId()),
         this::execute,
         JoinWindows.of(Duration.ofSeconds(10)),
         StreamJoined.with(Serdes.Integer(), orderSerde, orderSerde))
      .filterNot((k, v) -> v == null)
      .map((k, v) -> new KeyValue<>(v.getId(), v))
      .peek((k, v) -> log.infof("Done -> %s", v))
      .to(TRANSACTIONS_TOPIC, Produced.with(Serdes.Long(), transactionSerde));

}

To process the streams we need to add configuration properties. A list of input topics is required. We can also override a default application id and enable Kafka health check.

quarkus.kafka-streams.application-id = stock
quarkus.kafka-streams.topics = orders.buy,orders.sell
quarkus.kafka.health.enabled = true

Operations on Kafka Streams

Now, we may use some more advanced operations on Kafka Streams than just merging two different streams. In fact, that’s a key logic in our application. We need to join two different order streams into a single one using the productId as a joining key. Since the producer sets orderId as a message key, we first need to invoke the selectKey method for both order.sell and orders.buy streams. In our case, joining buy and sell orders related to the same product is just a first step. Then we need to verify if the maximum price in the buy order is not greater than the minimum price in the sell order.

The next step is to verify if both these have not been realized previously, as they also may be paired with other orders in the stream. If all the conditions are met we may create a new transaction. Finally, we may change a stream key from productId to the transactionId and send it to the dedicated transactions topic.

Each time we successfully join two orders we are trying to create a transaction. The execute(...) method is called within the KStream join method. Firstly, we are comparing the prices of both orders. Then we verify the realization status of both orders by accessing the H2 database. If the orders are still not fully realized we may create a transaction and update orders records in the database.

private Transaction execute(Order orderBuy, Order orderSell) {
   if (orderBuy.getAmount() >= orderSell.getAmount()) {
      int count = Math.min(orderBuy.getProductCount(), 
                           orderSell.getProductCount());
      boolean allowed = logic
         .performUpdate(orderBuy.getId(), orderSell.getId(), count);
      if (!allowed)
         return null;
      else
         return new Transaction(
            ++transactionId,
            orderBuy.getId(),
            orderSell.getId(),
            count,
            (orderBuy.getAmount() + orderSell.getAmount()) / 2,
            LocalDateTime.now(),
            "NEW"
      );
   } else {
            return null;
   }
}

Let’s take a closer look at the performUpdate() method called inside the execute() method. It initiates a transaction and locks both Order entities. Then it verifies each order realization status and updates it with the current values if possible. Only if the performUpdate() method finishes successfully the stock-service application creates a new transaction.

@ApplicationScoped
public class OrderLogic {

    @Inject
    Logger log;
    @Inject
    OrderRepository repository;

    @Transactional
    public Order add(Order order) {
        repository.persist(order);
        return order;
    }

    @Transactional
    public boolean performUpdate(Long buyOrderId, Long sellOrderId, int amount) {
        Order buyOrder = repository.findById(buyOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        Order sellOrder = repository.findById(sellOrderId, 
           LockModeType.PESSIMISTIC_WRITE);
        if (buyOrder == null || sellOrder == null)
            return false;
        int buyAvailableCount = 
           buyOrder.getProductCount() - buyOrder.getRealizedCount();
        int sellAvailableCount = 
           sellOrder.getProductCount() - sellOrder.getRealizedCount();
        if (buyAvailableCount >= amount && sellAvailableCount >= amount) {
            buyOrder.setRealizedCount(buyOrder.getRealizedCount() + amount);
            sellOrder.setRealizedCount(sellOrder.getRealizedCount() + amount);
            repository.persist(buyOrder);
            repository.persist(sellOrder);
            return true;
        } else {
            return false;
        }
    }
}

Nice 🙂 That’s all that we need to do in the first part of our exercise. Now we can run both our sample applications.

Run and manage Kafka Streams application with Quarkus

As I mentioned before, we first need to start the order-service. It runs a new Kafka instance and creates all required topics. Immediately after startup, it is ready to send new orders. To run the Quarkus app locally just go to the order-service directory and execute the following command:

$ mvn quarkus:dev

Just to verify you can display a list running Docker containers with the docker ps command. Here’s my result:

As you see the instance of Redpanda is running and it is available on a random port 49724. Quarkus did it for us. However, if you have Redpanda installed on your laptop you check out the list of created topics with their CLI rpk:

$ rpk topic list --brokers=127.0.0.1:49724

Then let’s run the stock-service. Go to the stock-service directory and run mvn quarkus:dev once again. After startup, it just works. Both applications share the same instance thanks to the Quarkus Dev Services. Now let’s access the Quarkus Dev UI console available at http://localhost:8080/q/dev/. Find the tile with the “Apache Kafka Streams” title.

You can check a visualization of our Kafka Streams topology. I will divide the image into two parts for better visibility.

Use Kafka KTable with Quarkus

We have already finished the implementation of the logic responsible for creating transactions from incoming orders. In the next step, we are going to perform analytical operations on the transactions stream. Our main goal is to calculate total number of transactions, total number of products sold/bought, and total value of transactions (price * productsCount) per each product. Here’s the object class used in calculations.

@RegisterForReflection
public class TransactionTotal {
   private int count;
   private int amount;
   private int productCount;

   // GETTERS AND SETTERS
}

Because the Transaction object does not contain information about the product, we first need to join the order to access it. Then we produce a KTable by per productId grouping and aggregation. After that, we may invoke an aggregate method that allows us to perform some more complex calculations. In that particular case, we are calculating the number of all executed transactions, their volume of products, and total value. The result KTable can be materialized as the state store. Thanks to that we will be able to query it by the name defined by the TRANSACTIONS_PER_PRODUCT_SUMMARY variable.

KeyValueBytesStoreSupplier storePerProductSupplier = Stores.persistentKeyValueStore(
   TRANSACTIONS_PER_PRODUCT_SUMMARY);

builder.stream(TRANSACTIONS_TOPIC, Consumed.with(Serdes.Long(), transactionSerde))
   .selectKey((k, v) -> v.getSellOrderId())
   .join(orders.selectKey((k, v) -> v.getId()),
      (t, o) -> new TransactionWithProduct(t, o.getProductId()),
      JoinWindows.of(Duration.ofSeconds(10)),
      StreamJoined.with(Serdes.Long(), transactionSerde, orderSerde))
   .groupBy((k, v) -> v.getProductId(), Grouped.with(Serdes.Integer(), transactionWithProductSerde))
   .aggregate(
      TransactionTotal::new,
      (k, v, a) -> {
         a.setCount(a.getCount() + 1);
         a.setProductCount(a.getAmount() + v.getTransaction().getAmount());
         a.setAmount(a.getProductCount() +
            (v.getTransaction().getAmount() * v.getTransaction().getPrice()));
         return a;
      },
      Materialized.<Integer, TransactionTotal> as(storePerProductSupplier)
         .withKeySerde(Serdes.Integer())
         .withValueSerde(transactionTotalSerde))
   .toStream()
   .peek((k, v) -> log.infof("Total per product(%d): %s", k, v))
   .to(TRANSACTIONS_PER_PRODUCT_AGGREGATED_TOPIC, 
      Produced.with(Serdes.Integer(), transactionTotalSerde));

Here’s the class responsible for interactive queries implementation. It injects KafkaStreams bean. Then it tries to obtain persistent store basing on the StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY variable. As a result there is a ReadOnlyKeyValueStore with Integer as a key, and TransactionTotal as a value. We may return a single value related with the particular productId (getTransactionsPerProductData) or just return a list with results for all available products (getAllTransactionsPerProductData).

@ApplicationScoped
public class InteractiveQueries {

   @Inject
   KafkaStreams streams;

   public TransactionTotal getTransactionsPerProductData(Integer productId) {
      return getTransactionsPerProductStore().get(productId);
   }

   public Map<Integer, TransactionTotal> getAllTransactionsPerProductData() {
      Map<Integer, TransactionTotal> m = new HashMap<>();
      KeyValueIterator<Integer, TransactionTotal> it = getTransactionsPerProductStore().all();
      while (it.hasNext()) {
         KeyValue<Integer, TransactionTotal> kv = it.next();
         m.put(kv.key, kv.value);
      }
      return m;
   }

   private ReadOnlyKeyValueStore<Integer, TransactionTotal> getTransactionsPerProductStore() {
      return streams.store(
         StoreQueryParameters
            .fromNameAndType(StockService.TRANSACTIONS_PER_PRODUCT_SUMMARY, QueryableStoreTypes.keyValueStore()));
   }

}

Finally, we can create a REST controller responsible for exposing data retrieved by the interactive queries.

@ApplicationScoped
@Path("/transactions")
public class TransactionResource {

    @Inject
    InteractiveQueries interactiveQueries;

    @GET
    @Path("/products/{id}")
    public TransactionTotal getByProductId(@PathParam("id") Integer productId) {
        return interactiveQueries.getTransactionsPerProductData(productId);
    }

    @GET
    @Path("/products")
    public Map<Integer, TransactionTotal> getAllPerProductId() {
        return interactiveQueries.getAllTransactionsPerProductData();
    }

}

Now you can easily check out statistics related to the transactions created by the stock-service. You just need to call the following REST endpoints e.g.:

$ curl http://localhost:8080/transactions/products
$ curl http://localhost:8080/transactions/products/3
$ curl http://localhost:8080/transactions/products/5

Final Thoughts

Quarkus simplifies working with Kafka Streams and interactive queries. It provides useful improvements for developers like auto-start of Kafka in dev and test modes or Kafka streams visualization in dev UI console. You can easily compare the Quarkus approach with the Spring Cloud Stream Kafka support since I implemented the same logic for both those frameworks. Here’s the GitHub repository with Spring Cloud Stream Kafka Streams example.

The post Kafka Streams with Quarkus appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2021/11/24/kafka-streams-with-quarkus/feed/ 4 10234
Overview of Java Stream API Extensions https://piotrminkowski.com/2019/10/04/overview-of-java-stream-api-extensions/ https://piotrminkowski.com/2019/10/04/overview-of-java-stream-api-extensions/#comments Fri, 04 Oct 2019 07:45:02 +0000 https://piotrminkowski.wordpress.com/?p=7286 Stream API, which has been introduced in Java 8, is probably still the most important new feature that has been included in Java during the last several years. I think that every Java developer has an opportunity to use Java Stream API in his career. Or I should rather say that you probably use it […]

The post Overview of Java Stream API Extensions appeared first on Piotr's TechBlog.

]]>
Stream API, which has been introduced in Java 8, is probably still the most important new feature that has been included in Java during the last several years. I think that every Java developer has an opportunity to use Java Stream API in his career. Or I should rather say that you probably use it on a day-to-day basis. However, if you compare the built-in features offered for functional programming with some other languages – for example, Kotlin – you will quickly realize that the number of methods provided by Stream API is very limited. Therefore, the community has created several libraries used just for extending API offered by pure Java. Today I’m going to show the most interesting Java Stream API extensions offered by the three popular libraries: StreamEx, jOOλ and Guava.

This article treats only sequential Java Stream extensions. If you would use parallel Streams you won’t be able to leverage jOOλ since it is dedicated only for sequential Streams.

Dependencies

Here’s the list of current releases of all three libraries compared in this article.

<dependencies>
   <dependency>
      <groupId>one.util</groupId>
      <artifactId>streamex</artifactId>
      <version>0.7.0</version>
   </dependency>
   <dependency>
      <groupId>org.jooq</groupId>
      <artifactId>jool</artifactId>
      <version>0.9.13</version>
   </dependency>
   <dependency>
      <groupId>com.google.guava</groupId>
      <artifactId>guava</artifactId>
      <version>28.1-jre</version>
   </dependency>
</dependencies>

1. Zipping with Java Stream extensions

When working with Java Streams in more advanced applications you will often process multiple streams. Also they can often contain different objects. One of useful operation in that case is zipping. Zipping operation returns a stream that contains a pair of corresponding elements in given two streams, which means that they are in the same position in those streams. Let’s consider two objects Person and PersonAddress. Assuming we have two streams, first which contains only Person objects and second with PersonAddress objects, and the order of elements clearly indicates their association we may zip them to create a new stream of objects containing all the fields from Person and PersonAddress. Here’s the screen that illustrates the described scenario.
streams-zipping.png
Zipping is supported by all the three currently described libraries. Let’s begin with the Guava example. It provides the only one method dedicated for zipping – static zip method that takes three parameters: first stream, second stream and mapping function.

Stream<Person> s1 = Stream.of(
   new Person(1, "John", "Smith"),
   new Person(2, "Tom", "Hamilton"),
   new Person(3, "Paul", "Walker")
);
Stream<PersonAddress> s2 = Stream.of(
   new PersonAddress(1, "London", "Street1", "100"),
   new PersonAddress(2, "Manchester", "Street1", "101"),
   new PersonAddress(3, "London", "Street2", "200")
);
Stream<PersonDTO> s3 = Streams.zip(s1, s2, (p, pa) -> PersonDTO.builder()
   .id(p.getId())
   .firstName(p.getFirstName())
   .lastName(p.getLastName())
   .city(pa.getCity())
   .street(pa.getStreet())
   .houseNo(pa.getHouseNo()).build());
s3.forEach(dto -> {
   Assertions.assertNotNull(dto.getId());
   Assertions.assertNotNull(dto.getFirstName());
   Assertions.assertNotNull(dto.getCity());
});

Both StreamEx and jOOλ offer more possibilities for zipping than Guava. We can between some static methods or non-static methods invoked on a given stream. Let’s take a look how we may perform it using StreamEx zipWith method.

StreamEx<Person> s1 = StreamEx.of(
   new Person(1, "John", "Smith"),
   new Person(2, "Tom", "Hamilton"),
   new Person(3, "Paul", "Walker")
);
StreamEx<PersonAddress> s2 = StreamEx.of(
   new PersonAddress(1, "London", "Street1", "100"),
   new PersonAddress(2, "Manchester", "Street1", "101"),
   new PersonAddress(3, "London", "Street2", "200")
);
StreamEx<PersonDTO> s3 = s1.zipWith(s2, (p, pa) -> PersonDTO.builder()
   .id(p.getId())
   .firstName(p.getFirstName())
   .lastName(p.getLastName())
   .city(pa.getCity())
   .street(pa.getStreet())
   .houseNo(pa.getHouseNo()).build());
s3.forEach(dto -> {
   Assertions.assertNotNull(dto.getId());
   Assertions.assertNotNull(dto.getFirstName());
   Assertions.assertNotNull(dto.getCity());
});

The example is almost identical. We have a zip method called on a given stream.

Seq<Person> s1 = Seq.of(
   new Person(1, "John", "Smith"),
   new Person(2, "Tom", "Hamilton"),
   new Person(3, "Paul", "Walker"));
Seq<PersonAddress> s2 = Seq.of(
   new PersonAddress(1, "London", "Street1", "100"),
   new PersonAddress(2, "Manchester", "Street1", "101"),
   new PersonAddress(3, "London", "Street2", "200"));
Seq<PersonDTO> s3 = s1.zip(s2, (p, pa) -> PersonDTO.builder()
   .id(p.getId())
   .firstName(p.getFirstName())
   .lastName(p.getLastName())
   .city(pa.getCity())
   .street(pa.getStreet())
   .houseNo(pa.getHouseNo()).build());
s3.forEach(dto -> {
   Assertions.assertNotNull(dto.getId());
   Assertions.assertNotNull(dto.getFirstName());
   Assertions.assertNotNull(dto.getCity());
});

2. Joining with Java Stream extensions

The zipping operation merges elements from two different streams in accordance to their order in those streams. What if we would like to associate elements basing on their fields like id, but not the order in a stream. Something ala LEFT JOIN or RIGHT JOIN between two entities. The result of an operation should be the same as for the previous section – a new stream of objects containing all the fields from Person and PersonAddress. The described operation is illustrated on the picture below.
java-stream-extensions-join.png
When it comes to join operation only jOOλ provides some methods for that. Since it is dedicated for object oriented queries, we may choose between many join options. For example there are innerJoin, leftOuterJoin, rightOuterJoin and crossJoin methods. In the source code visible below you can see an example of innerJoin usage. This methods takes two parameters: stream to join and predicate for matching elements from first stream and joining stream. If we would like to create new object basing on innerJoin result we should additionally invoke map operation.

Seq<Person> s1 = Seq.of(
      new Person(1, "John", "Smith"),
      new Person(2, "Tom", "Hamilton"),
      new Person(3, "Paul", "Walker"));
Seq<PersonAddress> s2 = Seq.of(
      new PersonAddress(2, "London", "Street1", "100"),
      new PersonAddress(3, "Manchester", "Street1", "101"),
      new PersonAddress(1, "London", "Street2", "200"));
Seq<PersonDTO> s3 = s1.innerJoin(s2, (p, pa) -> p.getId().equals(pa.getId())).map(t -> PersonDTO.builder()
      .id(t.v1.getId())
      .firstName(t.v1.getFirstName())
      .lastName(t.v1.getLastName())
      .city(t.v2.getCity())
      .street(t.v2.getStreet())
      .houseNo(t.v2.getHouseNo()).build());
s3.forEach(dto -> {
   Assertions.assertNotNull(dto.getId());
   Assertions.assertNotNull(dto.getFirstName());
   Assertions.assertNotNull(dto.getCity());
});

3. Grouping with Java Stream extensions

The next useful operation that is supported by Java Stream API only through static method groupingBy in java.util.stream.Collectors is grouping (s1.collect(Collectors.groupingBy(PersonDTO::getCity))). As a result of executing such an operation on stream you get a map with keys are the values resulting from applying the grouping function to the input elements, and whose corresponding values are lists containing the input elements. This operation is some kind of aggregation, so you get java.util.List as a result, no java.util.stream.Stream.
Both StreamEx and jOOλ provide some methods for grouping streams. Let’s start from the StreamEx groupingBy operation example. Assuming we have an input stream of PersonDTO objects we will group them by person’s home city.

StreamEx<PersonDTO> s1 = StreamEx.of(
   PersonDTO.builder().id(1).firstName("John").lastName("Smith").city("London").street("Street1").houseNo("100").build(),
   PersonDTO.builder().id(2).firstName("Tom").lastName("Hamilton").city("Manchester").street("Street1").houseNo("101").build(),
   PersonDTO.builder().id(3).firstName("Paul").lastName("Walker").city("London").street("Street2").houseNo("200").build(),
   PersonDTO.builder().id(4).firstName("Joan").lastName("Collins").city("Manchester").street("Street2").houseNo("201").build()
);
Map<String, List<PersonDTO>> m = s1.groupingBy(PersonDTO::getCity);
Assertions.assertNotNull(m.get("London"));
Assertions.assertTrue(m.get("London").size() == 2);
Assertions.assertNotNull(m.get("Manchester"));
Assertions.assertTrue(m.get("Manchester").size() == 2);

The result of similar jOOλ groupBy method is the same. It also returns multiple java.util.List objects inside map.


Seq<PersonDTO> s1 = Seq.of(
      PersonDTO.builder().id(1).firstName("John").lastName("Smith").city("London").street("Street1").houseNo("100").build(),
      PersonDTO.builder().id(2).firstName("Tom").lastName("Hamilton").city("Manchester").street("Street1").houseNo("101").build(),
      PersonDTO.builder().id(3).firstName("Paul").lastName("Walker").city("London").street("Street2").houseNo("200").build(),
      PersonDTO.builder().id(4).firstName("Joan").lastName("Collins").city("Manchester").street("Street2").houseNo("201").build()
);
Map<String, List<PersonDTO>> m = s1.groupBy(PersonDTO::getCity);
Assertions.assertNotNull(m.get("London"));
Assertions.assertTrue(m.get("London").size() == 2);
Assertions.assertNotNull(m.get("Manchester"));
Assertions.assertTrue(m.get("Manchester").size() == 2);

4. Multiple Concatenation

That’s a pretty simple scenario. Java Stream API provides a static method for concatenation, but only for two streams. Sometimes it is convenient to concat multiple streams in a single step. Guava and jOOλ provide the dedicated method for that.
Here’s the example of calling concat method with jOOλ:

Seq<Integer> s1 = Seq.of(1, 2, 3);
Seq<Integer> s2 = Seq.of(4, 5, 6);
Seq<Integer> s3 = Seq.of(7, 8, 9);
Seq<Integer> s4 = Seq.concat(s1, s2, s3);
Assertions.assertEquals(9, s4.count());

And here’s similar example for Guava:

Stream<Integer> s1 = Stream.of(1, 2, 3);
Stream<Integer> s2 = Stream.of(4, 5, 6);
Stream<Integer> s3 = Stream.of(7, 8, 9);
Stream<Integer> s4 = Streams.concat(s1, s2, s3);
Assertions.assertEquals(9, s4.count());

5. Partitioning

Partitioning operation is very similar to grouping, but divides input stream into two lists or streams, where elements in the first list fulfill a given predicate, while elements in the second list do not.
The StreamEx partitioningBy method will return two List objects inside Map.

StreamEx<PersonDTO> s1 = StreamEx.of(
      PersonDTO.builder().id(1).firstName("John").lastName("Smith").city("London").street("Street1").houseNo("100").build(),
      PersonDTO.builder().id(2).firstName("Tom").lastName("Hamilton").city("Manchester").street("Street1").houseNo("101").build(),
      PersonDTO.builder().id(3).firstName("Paul").lastName("Walker").city("London").street("Street2").houseNo("200").build(),
      PersonDTO.builder().id(4).firstName("Joan").lastName("Collins").city("Manchester").street("Street2").houseNo("201").build()
);
Map<Boolean, List<PersonDTO>> m = s1.partitioningBy(dto -> dto.getStreet().equals("Street1"));
Assertions.assertTrue(m.get(true).size() == 2);
Assertions.assertTrue(m.get(false).size() == 2);

In opposition to StreamEx, jOOλ is returning two streams (Seq) inside Tuple2 object. This approach has one big advantage over StreamEx – you can still invoke stream operations on a result without any conversions.


Seq<PersonDTO> s1 = Seq.of(
      PersonDTO.builder().id(1).firstName("John").lastName("Smith").city("London").street("Street1").houseNo("100").build(),
      PersonDTO.builder().id(2).firstName("Tom").lastName("Hamilton").city("Manchester").street("Street1").houseNo("101").build(),
      PersonDTO.builder().id(3).firstName("Paul").lastName("Walker").city("London").street("Street2").houseNo("200").build(),
      PersonDTO.builder().id(4).firstName("Joan").lastName("Collins").city("Manchester").street("Street2").houseNo("201").build()
);
Tuple2<Seq<PersonDTO>, Seq<PersonDTO>> t = s1.partition(dto -> dto.getStreet().equals("Street1"));
Assertions.assertTrue(t.v1.count() == 2);
Assertions.assertTrue(t.v2.count() == 2);

6. Aggregation

Only jOOλ provides some methods for streams aggregation. For example, we can count sum, avg, or median. Since jOOλ is a part of jOOQ it is targeted to be used for object-oriented queries, and in fact, provides many operations that correspond to the SQL SELECT clauses.
The fragment of source code visible below illustrates how easily we can count a sum of a selected field in the stream of objects, on the example of all the persons’ age.

Seq<Person> s1 = Seq.of(
   new Person(1, "John", "Smith", 35),
   new Person(2, "Tom", "Hamilton", 45),
   new Person(3, "Paul", "Walker", 20)
);
Optional<Integer> sum = s1.sum(Person::getAge);
Assertions.assertEquals(100, sum.get());

7. Pairing

StreamEx allows you to process pairs of adjacent objects in the stream and apply a given function on them. It may be achieved by using the pairMap function. In the fragment of code visible below I’m counting the sum for each pair of adjacent numbers in the stream.

StreamEx<Integer> s1 = StreamEx.of(1, 2, 1, 2, 1);
StreamEx<Integer> s2 = s1.pairMap(Integer::sum);
s2.forEach(i -> Assertions.assertEquals(3, i));

Summary

While Guava Streams is just a part of bigger Google’s library, StreamEx and jOOλ are strictly dedicated for lambda streams. In comparison to other libraries described in this article, jOOλ provides the largest number of features and operations. If you are looking for a library that helps you in performing OO operations on streams jOOλ is definitely for you. Unlike the other, it provides operations, for example for joining or aggregation. StreamEx also provides many useful operations for manipulating the streams. It is not related to object-oriented queries and SQL, so you won’t find their methods for out of order joins or aggregation which does not change the fact that it is very useful and worth to recommend a library. Guava provides a relatively small number of features for streams. However, if you have already used it in your application, it could be a nice addition for manipulating the streams. The source code snippets with examples of usage may be found on GitHub in the repository https://github.com/piomin/sample-java-playground.git.

The post Overview of Java Stream API Extensions appeared first on Piotr's TechBlog.

]]>
https://piotrminkowski.com/2019/10/04/overview-of-java-stream-api-extensions/feed/ 1 7286