In the project AngularPortfolioMgr the calculation of the quote difference in percent is stateful and is improved with a Gatherer. With Java 24 the stream Gatherers have become final. The Gatherers enable stateful operations in the Stream. With that feature workarounds like value references outside the stream that are then used inside the stream are no longer needed.
Java Stream Gatherers
The gatherers have been created to enable stateful operations in streams over multiple stream items. To enable that the gatherer can have these steps:
- inititializer to hold the state
- integrator to perform the logic an push out the result to the stream
- combiner to handle the results of muliple parallel streams
- finisher to handle left over stream items of the integrator
That steps enable to flexible handling of stateful operations in a stream. One of the Gatherers provided is the ‘windowFixed(…)’ one. It gets the window size and holds a collection in the initializer. The integrator adds items to the initializer collection until the window size is reached and then pushed the collection downstream and clears the initializer collection. The combiner can handle collections pushed by different integrators by pushing them downstream as they arrive. The finisher handles remaining items in the stream that do not fill a window. It can send the remaining items in a window downstream.
A use case is creating a collection of collections as a parameters for a sql in clauses. Oracle databases support only 1000 parameters in an in clause. To split a collection into collections of 1000 items the Gatherer ‘windowFixed(…)’ can be used. The NewsFeedService uses such a Gatherer:
... final var companyReports = companyReportsStream .gather(Gatherers.windowFixed(999)).toList(); final var symbols = companyReports.stream() .flatMap(myCompanyReports -> this.symbolRepository .findBySymbolIn(myCompanyReports.stream() .map(SymbolToCikWrapperDto.CompanySymbolDto::getTicker).toList()) ... With these steps many stateful operations can be handled inside the stream and state outside the stream becomes mostly necessary. The implementation of streams becomes cleaner and the work of Hotspot becomes easier because no side effects have to be handled.
A use case for a Java Stream Gatherer
The use case for stream Gatherers is calculating the percentage change between closing prices of stock quotes. To calculate the change the quote before is needed. That was the implementation before Java 24:
private LinkedHashMap<LocalDate, BigDecimal> calcClosePercentages( List<DailyQuote> portfolioQuotes, final LocalDate cutOffDate) { record DateToCloseAdjPercent(LocalDate localDate, BigDecimal closeAdjPercent) { } final var lastValue = new AtomicReference<BigDecimal>( new BigDecimal(-1000L)); final var closeAdjPercents = portfolioQuotes.stream() .filter(myQuote -> cutOffDate.isAfter( myQuote.getLocalDay())) .map(myQuote -> { var result = new BigDecimal(-1000L); if (lastValue.get().longValue() > -900L) { result = myQuote.getAdjClose() .divide(lastValue.get(), 25, RoundingMode.HALF_EVEN) .multiply(new BigDecimal(100L)); } lastValue.set(myQuote.getAdjClose()); return new DateToCloseAdjPercent(myQuote.getLocalDay(), result); }) .sorted((a, b) -> a.localDate().compareTo(b.localDate())) .filter(myValue -> myValue.closeAdjPercent().longValue() < -900L) .collect(Collectors.toMap(DateToCloseAdjPercent::localDate, DateToCloseAdjPercent::closeAdjPercent, (x, y) -> y, LinkedHashMap::new)); return closeAdjPercents; } The ‘lastValue’ is stored outside of the stream in an AtomicReference that is final because values used in the stream need to be effective final. The result value initialized with -1000 because a negative quotes do not exist and that makes the largest negative value -100. The initial value is filtered out before the quotes are collected with the filter for the percentage difference smaller then -900.
The Java 24 implementation with Gatherers in the PortfolioStatisticService:
private LinkedHashMap<LocalDate, BigDecimal> calcClosePercentages( List<DailyQuote> portfolioQuotes,final LocalDate cutOffDate) { final var closeAdjPercents = portfolioQuotes.stream() .filter(myQuote -> cutOffDate.isAfter(myQuote.getLocalDay())) .gather(calcClosePercentage()) .sorted((a, b) -> a.localDate().compareTo(b.localDate())) .collect(Collectors.toMap(DateToCloseAdjPercent::localDate, DateToCloseAdjPercent::closeAdjPercent, (x, y) -> y, LinkedHashMap::new)); return closeAdjPercents; } private static Gatherer<DailyQuote, AtomicReference<BigDecimal>, DateToCloseAdjPercent> calcClosePercentage() { return Gatherer.ofSequential( // Initializer () -> new AtomicReference<>(new BigDecimal(-1000L)), // Integrator (state, element, downstream) -> { var result = true; if (state.get().longValue() > -900L) { var resultPercetage = element.getAdjClose() .divide(state.get(), 25, RoundingMode.HALF_EVEN) .multiply(new BigDecimal(100L)); result = downstream.push(new DateToCloseAdjPercent( element.getLocalDay(), resultPercetage)); } state.set(element.getAdjClose()); return result; }); } In the method ‘calcClosePercentages(…) the record ‘DateToCloseAdjPercent(…)’ has moved to class level because it is used in both methods. The map operator has been replaced with ‘.gather(calcClosePercentage(…))’. The filter for the percentage difference smaller then -900 could be removed because that is handled in the Gatherer.
In the method ‘calcClosePercentage(…) the Gatherer is created with ‘Gatherer.ofSequential(…)’ because the calculation only works with ordered sequential quotes. First the initializer supplier is created with the initial value of ‘BigDecimal(1000L)’. Second the integrator is created with ‘(state, element, downstream)’. The ‘state’ parameter has the initial state of ‘AtomicReference<>(new BigDecimal(-1000))’ that is used for the previous closing of the quote. The ‘element’ is the current quote that is used in the calculation. The ‘downstream’ the stream that the result is pushed to. The ‘result’ is a boolean that shows if the streams accepts more values. It should be set to true or the ‘downstream.push(…) result, unless an exception happens that can not be handled. The ‘downstream’ parameter is used to push the ‘DateToCloseAdjPercent’ record to the stream. Not pushed values are effectively filtered out. The ‘state’ parameter is set to the current quotes close value for the next time the gatherer is called. Then the result is returned to inform the stream if more values are accepted.
Conclusion
This is only one of the use cases that can be improved with Gatherers. The use of value references outside of the stream to do stateful operations in streams is quite common and is not longer needed. That will enable the Jvm to optimize more because with Gatherers Hotspot does not have to handle side effects. With he Gaherers Api Java has filled a gap in the stream api and enables now elegant solutions for stateful use cases.
Java offers prebuild Gatherers like ‘Gatherers.windowSliding(…)’ and ‘Gatherers.windowFixed(…)’, … that help solve common use cases.
The reasons for a Java 25 LTS update are:
- Thread pinning issue of Virtual Threads is mitigated -> better scalability
- Ahead-of-Time Class Loading & Linking -> faster application startup for large applications
- Stream Gatherers -> cleaner code, improved optimization(no side effects)