Reference: https://www.amazon.com/Modern-Java-Action-functional-programming/dp/1617293563
- We could ask shop for a price of a product (by its id)
class Shop { Price getPrice(int id) { Delay.delay(); return Price.of(id); } } Shopanswers with some delay (200ms)class Delay { static void delay() { try { Thread.sleep(200); } catch (InterruptedException e) { // not used } } }- we want to ask shop for many ids (for example we have a stream of ids)
- naive approach - scales badly
- one product time: 203 ms
var priceFutures = IntStream.range(1, 2) .parallel() .mapToObj(id -> CompletableFuture.supplyAsync( () -> shop.getPrice(id))) .collect(toList()); var prices = priceFutures.stream() .parallel() .map(CompletableFuture::join) .collect(toList()); - four products time: 203 ms
var priceFutures = IntStream.range(1, 4) .parallel() .mapToObj(id -> CompletableFuture.supplyAsync( () -> shop.getPrice(id))) .collect(toList()); var prices = priceFutures.stream() .parallel() .map(CompletableFuture::join) .collect(toList()); - many products time: 2 s
var priceFutures = IntStream.range(1, 30) .parallel() .mapToObj(id -> CompletableFuture.supplyAsync( () -> shop.getPrice(id))) .collect(toList()); var prices = priceFutures.stream() .parallel() .map(CompletableFuture::join) .collect(toList());
- one product
- dedicated executor - scales perfectly time: 668 ms
var size = 300; var executor = Executors.newFixedThreadPool(Math.min(size, 100), r -> { Thread t = new Thread(r); t.setDaemon(true); return t; }); var priceFutures = IntStream.range(1, size) .parallel() .mapToObj(id -> CompletableFuture.supplyAsync( () -> shop.getPrice(id), executor)) .collect(toList()); var prices = priceFutures.parallelStream() .map(CompletableFuture::join) .collect(toList());
CompletableFuture and parallel streams internally use the same common pool that by default has a fixed number of threads equal to the one returned by Runtime.getRuntime().availableProcessors().
So we decide to prepare dedicated executor for CompletableFuture tasks. How we estimated the number of threads in a fixed pool? From the given formula:
Nthreads = NCPU * UCPU * (1 + W/C)- NCPU is the number of cores, available through
Runtime.getRuntime().availableProcessors() - UCPU is the target CPU utilization (between 0 and 1), and
- W/C is the ratio of wait time to compute time
- NCPU is the number of cores, available through