Leveraging CompletableFutures to handle your query results asynchronously David Gómez G. Developer Advocate
Concurrency vs Parallelism
Same type of tasks Workload split between similar workers Have the work finished before Parallelism Concurrency Different type of tasks Usually do some works while waiting for other tasks to be completed. Have the work finished before @DGOMEZG
Java and Concurrency
Lots of tools since Java 1 Java has (basic) building blocks for Concurrency and parallelism since Java 1. Even when most desktop computers were single-core @DGOMEZG
Threads Very Expensive Thread t = new Thread(task); t.start(); t.join(); Easily forgotten Blocking call
ThreadPools @DGOMEZG
ThreadPools @DGOMEZG
ThreadPools @DGOMEZG
ThreadPools @DGOMEZG
ThreadPools @DGOMEZG
Since Java 5 Concurrency & parallelism support improved in Java 5 Lots of New Concurrency constructors New concurrent-friendly Data Structures Revisited and improved in Java 7, 8, 9 … More coming up as part of loom project @DGOMEZG
Running tasks asynchronously
ExecutorService And now we can submit: - tasks that do not return anything. Runnable - tasks that return a result. Callable<R> Executor Service returns a Future<R> @DGOMEZG
ExecutorService & Future<T> ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> taskResult = executorService.submit(longRunningTaskWithResult); String s = taskResult.get(); Blocking call
ExecutorService & Future<T> ExecutorService executorService = Executors.newCachedThreadPool(); Future<String> taskResult = executorService.submit(longRunningTaskWithResult); while (!taskResult.isDone()) { doSomethingElse(); } String s = taskResult.get(); False while task is not cancelled nor fi nished Improving performance by doing other tasks.
Running Multiple T asks List<Future<String > > taskResults = new ArrayList < > (); for (Callable<String> task : tasks) { taskResults.add(executorService.submit(task)); } for (Future<String> result : taskResults) { processResult(result.get()); } Bene fi t only between submitting all tasks and getting the results
Running Heterogeneous T asks List<Future<String > > taskResults = new ArrayList < > (); for (Callable<String> task : tasks) { taskResults.add(executorService.submit(task)); } for (Future<String> result : taskResults) { processResult(result.get()); } What if task1 is too slow while task2 is fast? What if we need the result of task1 to submit task2? (CompletionService returns tasks in the order they are fi nished)
CompletableFuture
CompletableFuture<T> Introduced in Java8 Has specific methods to submit tasks to an ExecutorService Implements a fluent API (CompletionStage) that allows chaining and combining dependant tasks. Allow to use different sized ExecutorServices for different tasks. @DGOMEZG
Async with CompletableFuture CompletableFuture.runAsync(getLongRunningTaskWithNoResult());
Async with CompletableFuture CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse();
Async with CompletableFuture CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished
Async with CompletableFuture CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Async with CompletableFuture private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); Shared with the JVM (may affect parallel streams) Sized for CPU intensive tasks (if our task is IO bound, we may affect app performance) Daemon Threads (if our main thread finishes, tasks may not be executed) Same that new Thread(task).start();
Async with CompletableFuture CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished
Async with CompletableFuture CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished ExecutorService executorService = Executors.newCachedThreadPool(); executorService Sized and used for the expected % of CPU used by your tasks
Async with CompletableFuture CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished ExecutorService executorService = Executors.newCachedThreadPool(); executorService
CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished NoResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService
CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished NoResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService Result()
CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished Result() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService
CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished sup Result() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService Void> voidCF = Async(getLongRunningTaskWithResult()
CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished sup Async(getLongRunningTaskWithResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService Void> voidCF = ply
CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); result.get(); / / Wait until background task is finished String> result = sup Async(getLongRunningTaskWithResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService ply
CompletableFuture<T> It may seem just an alternative to Future<T> public class CompletableFuture<T> implements Future<T>, CompletionStage<T> CompletionStage<T> @DGOMEZG
CompletionStage<T> CompletableFuture<T> Fluent API to chain, compose and combine Futures. @DGOMEZG
Chaining tasks (blocking) void invokeSlowProcess() { Order order = orderService.getOrder(); customerChecker.authorized(order.customer()); } @DGOMEZG executed by main in PT7.045928S invokeSlowServiceDependantTasks();
Chaining tasks with CompletableFuture @DGOMEZG CompletableFuture .supplyAsync( () - > orderService.getOrder() ).thenAcceptAsync( order - > customerChecker.authorized(order.customer()) ); Submitted by main in PT0.058982S Executed by ForkJoinPool.commonPool-worker-1 in PT2.008558S Executed by ForkJoinPool.commonPool-worker-2 in PT5.010714S
Chaining parallel tasks @DGOMEZG
Chaining tasks var orderCF = CompletableFuture.supplyAsync( () - > orderService.getOrder() ); orderCF.thenAcceptAsync( order - > activityTracker.orderAccessed(order))); orderCF.thenAcceptAsync( order - > customerChecker.authorized(order.customer()))); @DGOMEZG Submitted by main in PT0.071653S getOrder executed by ForkJoinPool.commonPool-worker-1 in PT2.006599S activityTracker executed by ForkJoinPool.commonPool-worker-3 in PT2.009098S customerChecker executed by ForkJoinPool.commonPool-worker-2 in PT5.007562S
Chaining multiple parallel tasks @DGOMEZG
Chaining multiple parallel tasks @DGOMEZG var orderCF = CompletableFuture.supplyAsync( () - > orderService.getOrder() ); orderCF.thenAcceptAsync( order - > customerChecker.authorized(order.customer()))); orderWithCF.thenAcceptAsync(order - > { order.items().stream() .map(orderItem - > CompletableFuture.supplyAsync( () - > inventoryService.checkAvailability( orderItem.productId(), orderItem.quantity())) ).collect(Collectors.toList()) .forEach(CompletableFuture : : join); });
CompletableFuture<T> Fluent API to chain, compose and combine Futures. CompletionStage<T> acceptEither applyToEither runAfterEither runAfterBoth thenAccept thenAcceptBoth thenRun thenCombine thenCompose whenAccept @DGOMEZG
CompletableFuture In your API
Why should you consider using CompletableFuture in you API? Enable your users to maximise the performance to your queries. And keep control on how you execute the asynchronous tasks. Benefit from how other frameworks handle CompletableFutures (i. e Spring) @DGOMEZG
Creating and managing CompletableFutures CompletableFuture<String> completableFuture = new CompletableFuture < > (); @DGOMEZG
Creating and managing CompletableFutures @DGOMEZG
Creating and managing CompletableFutures @DGOMEZG
@DGOMEZG An Example: QueryBus at AxonFramework Queries UI / API Commands Command Model Projections Events
An Example: QueryBus at AxonFramework UI / API Projections queryMessage @DGOMEZG QueryBus
QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage> query(QueryMessage query) { / / Create your CompletableFuture / / Prepare to Run asynchronously / / get Results/Exceptions to complete the future / / return your completableFuture }
QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage> query(QueryMessage query) { / / Create your CompletableFuture CompletableFuture<QueryResponseMessage> queryTransaction = new CompletableFuture < > (); / / Prepare to Run asynchronously / / get Results/Exceptions to complete the future / / return your completableFuture return queryTransaction; }
QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage> query(QueryMessage query) { CompletableFuture<QueryResponseMessage> queryTransaction = new CompletableFuture < > (); try { / / Prepare to Run asynchronously / / get Results to complete the future } catch (Exception e) { queryTransaction.completeExceptionally(exception); } return queryTransaction; }
QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage> query(QueryMessage query) { CompletableFuture<QueryResponseMessage> queryTransaction = new CompletableFuture < > (); try { ResultStream<QueryResponse> result = axonServerConnectionManager.getConnection(targetContext) .query(query); / * . . . * / result.onAvailable(() - > queryTransaction.complete(result.nextIfAvailable()) } catch (Exception e) { queryTransaction.completeExceptionally(exception); } return queryTransaction; }
Benefits It allows to compose asynchronous query calls to different systems @DGOMEZG private final QueryGateway queryGateway; @GetMapping("orders/{OrderId}/status") public Future<OrderStatus> orderStatus(@PathVariable String orderId) { CompletableFuture<OrderStatus> orderStatusCF = queryGateway.query(new OrderStatusQuery(orderId), new InstanceResponseType < > (OrderStatus.class)); / / Invoke other external services asynchronously / / Composing the tasks with CompletionStage.* / / Useful in processes when applying the "Strangling The monolith Pattern” / / Probably not very usual when using DDD return orderStatusCF; }
Benefits SpringFramework knows how to return a Future<T> Tomcat (and most application Servers) can handle better the Threads in the ThreadPool @DGOMEZG private final QueryGateway queryGateway; @GetMapping(“orders/{OrderId}/status") public Future<OrderStatus> orderStatus(@PathVariable String orderId) { return CompletableFuture<OrderStatus> orderStatusCF = queryGateway.query(new OrderStatusQuery(orderId), new InstanceResponseType < > (OrderStatus.class)); }
Conclusions
Conclusions CompletableFutures are a powerful tool to compose and combine Asynchronous execution of heterogeneous tasks Best if used with specific ExecutorService instances, properly sized based depending on amount of I/O-Wait time Adding CompletableFutures on you Async API will give more powers to the developers using your library/framework. @DGOMEZG
Where do I go from here? https://github.com/AxonFramework/AxonFramework Quick Start Tutorial. 
 https://docs.axoniq.io/reference-guide/getting-started/quick-start Free Courses on DDD, CQRS, Event-Sourcing 
 https://academy.axoniq.io/ https:/ /lp.axoniq.io/devnexus-22 The slides JavaSpecialists newsletter Java Concurrency Specialist Courses

Leverage CompletableFutures to handle async queries. DevNexus 2022

  • 1.
    Leveraging CompletableFutures tohandle your query results asynchronously David Gómez G. Developer Advocate
  • 2.
  • 3.
    Same type oftasks Workload split between similar workers Have the work finished before Parallelism Concurrency Different type of tasks Usually do some works while waiting for other tasks to be completed. Have the work finished before @DGOMEZG
  • 4.
  • 5.
    Lots of toolssince Java 1 Java has (basic) building blocks for Concurrency and parallelism since Java 1. Even when most desktop computers were single-core @DGOMEZG
  • 6.
    Threads Very Expensive Thread t= new Thread(task); t.start(); t.join(); Easily forgotten Blocking call
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
    Since Java 5 Concurrency& parallelism support improved in Java 5 Lots of New Concurrency constructors New concurrent-friendly Data Structures Revisited and improved in Java 7, 8, 9 … More coming up as part of loom project @DGOMEZG
  • 13.
  • 14.
    ExecutorService And now wecan submit: - tasks that do not return anything. Runnable - tasks that return a result. Callable<R> Executor Service returns a Future<R> @DGOMEZG
  • 15.
    ExecutorService & Future<T> ExecutorServiceexecutorService = Executors.newCachedThreadPool(); Future<String> taskResult = executorService.submit(longRunningTaskWithResult); String s = taskResult.get(); Blocking call
  • 16.
    ExecutorService & Future<T> ExecutorServiceexecutorService = Executors.newCachedThreadPool(); Future<String> taskResult = executorService.submit(longRunningTaskWithResult); while (!taskResult.isDone()) { doSomethingElse(); } String s = taskResult.get(); False while task is not cancelled nor fi nished Improving performance by doing other tasks.
  • 17.
    Running Multiple T asks List<Future<String > > taskResults= new ArrayList < > (); for (Callable<String> task : tasks) { taskResults.add(executorService.submit(task)); } for (Future<String> result : taskResults) { processResult(result.get()); } Bene fi t only between submitting all tasks and getting the results
  • 18.
    Running Heterogeneous T asks List<Future<String > > taskResults= new ArrayList < > (); for (Callable<String> task : tasks) { taskResults.add(executorService.submit(task)); } for (Future<String> result : taskResults) { processResult(result.get()); } What if task1 is too slow while task2 is fast? What if we need the result of task1 to submit task2? (CompletionService returns tasks in the order they are fi nished)
  • 19.
  • 20.
    CompletableFuture<T> Introduced in Java8 Hasspecific methods to submit tasks to an ExecutorService Implements a fluent API (CompletionStage) that allows chaining and combining dependant tasks. Allow to use different sized ExecutorServices for different tasks. @DGOMEZG
  • 21.
  • 22.
    Async with CompletableFuture CompletableFuture<Void>voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse();
  • 23.
    Async with CompletableFuture CompletableFuture<Void>voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished
  • 24.
    Async with CompletableFuture CompletableFuture<Void>voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
  • 25.
    Async with CompletableFuture privatestatic final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); Shared with the JVM (may affect parallel streams) Sized for CPU intensive tasks (if our task is IO bound, we may affect app performance) Daemon Threads (if our main thread finishes, tasks may not be executed) Same that new Thread(task).start();
  • 26.
    Async with CompletableFuture CompletableFuture<Void>voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished private static final Executor ASYNC_POOL = USE_COMMON_POOL ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
  • 27.
    Async with CompletableFuture ExecutorServiceexecutorService = Executors.newCachedThreadPool(); executorService CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult()); doSomethingElse(); voidCF.get(); / / Wait until background task is finished
  • 28.
    Async with CompletableFuture CompletableFuture<Void>voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished ExecutorService executorService = Executors.newCachedThreadPool(); executorService Sized and used for the expected % of CPU used by your tasks
  • 29.
    Async with CompletableFuture CompletableFuture<Void>voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished ExecutorService executorService = Executors.newCachedThreadPool(); executorService
  • 30.
    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished NoResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService
  • 31.
    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished NoResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService Result()
  • 32.
    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished Result() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService
  • 33.
    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished sup Result() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService Void> voidCF = Async(getLongRunningTaskWithResult()
  • 34.
    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); voidCF.get(); / / Wait until background task is finished sup Async(getLongRunningTaskWithResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService Void> voidCF = ply
  • 35.
    CompletableFuture<Void> voidCF = CompletableFuture.runAsync(getLongRunningTaskWithNoResult() , ); doSomethingElse(); result.get(); / / Wait until background task is finished String> result = sup Async(getLongRunningTaskWithResult() Async with CompletableFuture ExecutorService executorService = Executors.newCachedThreadPool(); executorService ply
  • 36.
    CompletableFuture<T> It may seemjust an alternative to Future<T> public class CompletableFuture<T> implements Future<T>, CompletionStage<T> CompletionStage<T> @DGOMEZG
  • 37.
    CompletionStage<T> CompletableFuture<T> Fluent API tochain, compose and combine Futures. @DGOMEZG
  • 38.
    Chaining tasks (blocking) voidinvokeSlowProcess() { Order order = orderService.getOrder(); customerChecker.authorized(order.customer()); } @DGOMEZG executed by main in PT7.045928S invokeSlowServiceDependantTasks();
  • 39.
    Chaining tasks withCompletableFuture @DGOMEZG CompletableFuture .supplyAsync( () - > orderService.getOrder() ).thenAcceptAsync( order - > customerChecker.authorized(order.customer()) ); Submitted by main in PT0.058982S Executed by ForkJoinPool.commonPool-worker-1 in PT2.008558S Executed by ForkJoinPool.commonPool-worker-2 in PT5.010714S
  • 40.
  • 41.
    Chaining tasks var orderCF= CompletableFuture.supplyAsync( () - > orderService.getOrder() ); orderCF.thenAcceptAsync( order - > activityTracker.orderAccessed(order))); orderCF.thenAcceptAsync( order - > customerChecker.authorized(order.customer()))); @DGOMEZG Submitted by main in PT0.071653S getOrder executed by ForkJoinPool.commonPool-worker-1 in PT2.006599S activityTracker executed by ForkJoinPool.commonPool-worker-3 in PT2.009098S customerChecker executed by ForkJoinPool.commonPool-worker-2 in PT5.007562S
  • 42.
  • 43.
    Chaining multiple paralleltasks @DGOMEZG var orderCF = CompletableFuture.supplyAsync( () - > orderService.getOrder() ); orderCF.thenAcceptAsync( order - > customerChecker.authorized(order.customer()))); orderWithCF.thenAcceptAsync(order - > { order.items().stream() .map(orderItem - > CompletableFuture.supplyAsync( () - > inventoryService.checkAvailability( orderItem.productId(), orderItem.quantity())) ).collect(Collectors.toList()) .forEach(CompletableFuture : : join); });
  • 44.
    CompletableFuture<T> Fluent API tochain, compose and combine Futures. CompletionStage<T> acceptEither applyToEither runAfterEither runAfterBoth thenAccept thenAcceptBoth thenRun thenCombine thenCompose whenAccept @DGOMEZG
  • 45.
  • 46.
    Why should youconsider using CompletableFuture in you API? Enable your users to maximise the performance to your queries. And keep control on how you execute the asynchronous tasks. Benefit from how other frameworks handle CompletableFutures (i. e Spring) @DGOMEZG
  • 47.
    Creating and managingCompletableFutures CompletableFuture<String> completableFuture = new CompletableFuture < > (); @DGOMEZG
  • 48.
    Creating and managingCompletableFutures @DGOMEZG
  • 49.
    Creating and managingCompletableFutures @DGOMEZG
  • 50.
    @DGOMEZG An Example: QueryBusat AxonFramework Queries UI / API Commands Command Model Projections Events
  • 51.
    An Example: QueryBusat AxonFramework UI / API Projections queryMessage @DGOMEZG QueryBus
  • 52.
    QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage>query(QueryMessage query) { / / Create your CompletableFuture / / Prepare to Run asynchronously / / get Results/Exceptions to complete the future / / return your completableFuture }
  • 53.
    QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage>query(QueryMessage query) { / / Create your CompletableFuture CompletableFuture<QueryResponseMessage> queryTransaction = new CompletableFuture < > (); / / Prepare to Run asynchronously / / get Results/Exceptions to complete the future / / return your completableFuture return queryTransaction; }
  • 54.
    QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage>query(QueryMessage query) { CompletableFuture<QueryResponseMessage> queryTransaction = new CompletableFuture < > (); try { / / Prepare to Run asynchronously / / get Results to complete the future } catch (Exception e) { queryTransaction.completeExceptionally(exception); } return queryTransaction; }
  • 55.
    QueryBusImpl (simplified) public CompletableFuture<QueryResponseMessage>query(QueryMessage query) { CompletableFuture<QueryResponseMessage> queryTransaction = new CompletableFuture < > (); try { ResultStream<QueryResponse> result = axonServerConnectionManager.getConnection(targetContext) .query(query); / * . . . * / result.onAvailable(() - > queryTransaction.complete(result.nextIfAvailable()) } catch (Exception e) { queryTransaction.completeExceptionally(exception); } return queryTransaction; }
  • 56.
    Benefits It allows tocompose asynchronous query calls to different systems @DGOMEZG private final QueryGateway queryGateway; @GetMapping("orders/{OrderId}/status") public Future<OrderStatus> orderStatus(@PathVariable String orderId) { CompletableFuture<OrderStatus> orderStatusCF = queryGateway.query(new OrderStatusQuery(orderId), new InstanceResponseType < > (OrderStatus.class)); / / Invoke other external services asynchronously / / Composing the tasks with CompletionStage.* / / Useful in processes when applying the "Strangling The monolith Pattern” / / Probably not very usual when using DDD return orderStatusCF; }
  • 57.
    Benefits SpringFramework knows howto return a Future<T> Tomcat (and most application Servers) can handle better the Threads in the ThreadPool @DGOMEZG private final QueryGateway queryGateway; @GetMapping(“orders/{OrderId}/status") public Future<OrderStatus> orderStatus(@PathVariable String orderId) { return CompletableFuture<OrderStatus> orderStatusCF = queryGateway.query(new OrderStatusQuery(orderId), new InstanceResponseType < > (OrderStatus.class)); }
  • 58.
  • 59.
    Conclusions CompletableFutures are apowerful tool to compose and combine Asynchronous execution of heterogeneous tasks Best if used with specific ExecutorService instances, properly sized based depending on amount of I/O-Wait time Adding CompletableFutures on you Async API will give more powers to the developers using your library/framework. @DGOMEZG
  • 60.
    Where do Igo from here? https://github.com/AxonFramework/AxonFramework Quick Start Tutorial. 
 https://docs.axoniq.io/reference-guide/getting-started/quick-start Free Courses on DDD, CQRS, Event-Sourcing 
 https://academy.axoniq.io/ https:/ /lp.axoniq.io/devnexus-22 The slides JavaSpecialists newsletter Java Concurrency Specialist Courses