Asynchronous API in Java8, how to use CompletableFuture
The document discusses asynchronous programming in Java, focusing on the use of CompletableFuture and CompletionStage to handle tasks efficiently. It outlines different execution models (synchronous, multithreaded, asynchronous) and provides examples of creating and testing asynchronous tasks using Java concepts. Additionally, it highlights the advantages of non-blocking operations and patterns for chaining and combining tasks.
@JosePaumard#Devoxx #J8Async Difference withthe synchronous multithreaded model? 1) The async engine decides to switch from one context to another 2) Single threaded = no issue with atomicity or visibility Performances? No multithreaded « context switch » Asynchronous
@JosePaumard#Devoxx #J8Async Pattern Callback ortask: lambda expression When the result is available, then we can continue with the next task Asynchronous queryEngine.select("select user from User") .forEach(user -> System.out.prinln(user)) ;
15.
@JosePaumard#Devoxx #J8Async Pattern Callback ortask: lambda expression When the result is available, then we can continue with the next task Now how can we write that in Java? Asynchronous queryEngine.select("select user from User") .forEach(user -> System.out.prinln(user)) ;
16.
@JosePaumard#Devoxx #J8Async A taskin Java Since Java 1: Runnable Since Java 5: Callable In Java 5 we have the ExecutorService (pool of threads) We give a task and get back a Future
17.
@JosePaumard#Devoxx #J8Async A taskin Java Pattern Callable<String> task = () -> "select user from User" ; Future<String> future = executorService.submit(task) ;
18.
@JosePaumard#Devoxx #J8Async A taskin Java Pattern Callable<String> task = () -> "select user from User" ; Future<String> future = executorService.submit(task) ; List<User> users = future.get() ; // blocking users.forEach(System.out::println) ;
19.
@JosePaumard#Devoxx #J8Async A taskin Java Pattern Passing an object from one task to another has to be handled in the « master » thread Callable<String> task = () -> "select user from User" ; Future<String> future = executorService.submit(task) ; List<User> users = future.get() ; // blocking users.forEach(System.out::println) ;
@JosePaumard#Devoxx #J8Async Creation ofan asynchronous task The Jersey way to create an asynchronous call @Path("/resource") public class AsyncResource { @GET public void asyncGet(@Suspended final AsyncResponse asyncResponse) { new Thread(new Runnable() { public void run() { String result = longOperation(); asyncResponse.resume(result); } }).start(); } }
26.
@JosePaumard#Devoxx #J8Async Creation ofan asynchronous task (let us fix this code, this is Java 8) @Path("/resource") public class AsyncResource { @Inject private Executor executor; @GET public void asyncGet(@Suspended final AsyncResponse asyncResponse) { executor.execute(() -> { String result = longOperation(); asyncResponse.resume(result); }); } }
27.
@JosePaumard#Devoxx #J8Async How totest it? The question is: how can we test that code? We want to check if the result object is passed to the resume() method of the asyncResponse
28.
@JosePaumard#Devoxx #J8Async How totest it? We have mocks for that! It is a very basic test, but tricky to write since we are in an asynchronous world
29.
@JosePaumard#Devoxx #J8Async How totest it? Let us give one more look at the code @Path("/resource") public class AsyncResource { @GET public void asyncGet(@Suspended final AsyncResponse asyncResponse) { executor.execute(() -> { // executed in the main thread String result = longOperation(); // executed in another thread asyncResponse.resume(result); }); } }
30.
@JosePaumard#Devoxx #J8Async How totest it? We have mocks to check if resume() is properly called with result It is a very basic test, but tricky to write since we are in an asynchronous world
31.
@JosePaumard#Devoxx #J8Async How totest it? We can inject a mock AsyncResponse, even mock the result @Path("/resource") public class AsyncResource { @GET public void asyncGet(@Suspended final AsyncResponse asyncResponse) { executor.execute(() -> { String result = longOperation(); asyncResponse.resume(result); }); } }
32.
@JosePaumard#Devoxx #J8Async How totest it? We can inject a mock AsyncResponse, even mock the result Then verify the correct interaction: But: - we need to verify this once the run() method has been called Mockito.verify(mockAsyncResponse).resume(result);
33.
@JosePaumard#Devoxx #J8Async How totest it? We can inject a mock AsyncResponse, even mock the result Then verify the correct interaction: But: - we need to verify this once the run() method has been called - and take into account the multithreaded aspect… the read / writes on the mock should be « visible »! Mockito.verify(mockAsyncResponse).resume(result);
34.
@JosePaumard#Devoxx #J8Async How totest it? So our constraints are the following: - we need to verify this once the run() method has been called - we need to read / write on our mocks in the same thread as the one which runs the task we want to test
35.
@JosePaumard#Devoxx #J8Async How totest it? This is where CompletionStage comes to the rescue! @Path("/resource") public class AsyncResource { @Inject ExecutorService executor; @GET public void asyncGet(@Suspended final AsyncResponse asyncResponse) { executor.submit(() -> { String result = longOperation(); asyncResponse.resume(result); }); } }
36.
@JosePaumard#Devoxx #J8Async How totest it? This pattern: executor.submit(() -> { String result = longOperation(); asyncResponse.resume(result); });
37.
@JosePaumard#Devoxx #J8Async How totest it? This pattern: Becomes this one: And does basically the same thing executor.submit(() -> { String result = longOperation(); asyncResponse.resume(result); }); CompletableFuture.runAsync(() -> { String result = longOperation(); asyncResponse.resume(result); }, executor);
38.
@JosePaumard#Devoxx #J8Async How totest it? But the nice thing is: CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { String result = longOperation(); asyncResponse.resume(result); }, executor);
39.
@JosePaumard#Devoxx #J8Async How totest it? But the nice thing is: And on this object we can call: CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { String result = longOperation(); asyncResponse.resume(result); }, executor); completableFuture .thenRun(() -> { Mockito.verify(mockAsyncResponse).resume(result); } );
40.
@JosePaumard#Devoxx #J8Async How totest it? Be careful of visibility issues 1) It’s simpler to run everything in the same thread 2) Create, train and check our mocks in this thread
41.
@JosePaumard#Devoxx #J8Async CompletionStage /CompletableFuture Two elements in this API: - an interface: CompletionStage - an implementing class: CompletableFuture The interface depends on CompletableFuture: public CompletableFuture<T> toCompletableFuture();
42.
@JosePaumard#Devoxx #J8Async What isa CompletionStage? A model for a task: - that performs an action an may return a value when another completion stage completes - that may trigger other tasks So a completion stage is an element of a chain
@JosePaumard#Devoxx #J8Async What isa CompletableFuture? A class that implements both Future and CompletionStage It has a state: - the task may be running - the task may have complete normally - the task may have complete exceptionnaly
@JosePaumard#Devoxx #J8Async Methods fromFuture Five methods: boolean cancel(boolean mayInterruptIfRunning) ; boolean isCanceled() ; boolean isDone() ; V get() ; // blocking call V get(long timeout, TimeUnit timeUnit) ; // may throw a checked exception throws InterruptedException, ExecutionException, TimeoutException ;
48.
@JosePaumard#Devoxx #J8Async More fromCompletableFuture Future-like methods: V join() ; // may throw an unchecked exception V getNow(V valueIfAbsent) ; // returns immediately
49.
@JosePaumard#Devoxx #J8Async More fromCompletableFuture Future-like methods: V join() ; // may throw an unchecked exception V getNow(V valueIfAbsent) ; // returns immediately boolean complete(V value) ; // sets the returned value is not returned void obtrudeValue(V value) ; // resets the returned value
50.
@JosePaumard#Devoxx #J8Async More fromCompletableFuture Future-like methods: V join() ; // may throw an unchecked exception V getNow(V valueIfAbsent) ; // returns immediately boolean complete(V value) ; // sets the returned value is not returned void obtrudeValue(V value) ; // resets the returned value boolean completeExceptionnaly(Throwable t) ; // sets an exception void obtrudeException(Throwable t) ; // resets with an exception
51.
@JosePaumard#Devoxx #J8Async How tocreate a CompletableFuture? A completed CompletableFuture public static <U> CompletableFuture<U> completedFuture(U value) ;
52.
@JosePaumard#Devoxx #J8Async How tocreate a CompletableFuture? A CompletableFuture from a Runnable or a Supplier public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) ; public static <U> CompletableFuture<U> supplyAsync(Supplier<U> value, Executor executor) ;
53.
@JosePaumard#Devoxx #J8Async Building CompletionStagechains A CompletionStage is a step in a chain - it can be triggered by a previous CompletionStage - it can trigger another CompletionStage - it can be executed in a given Executor
@JosePaumard#Devoxx #J8Async Building CompletionStagechains What kind of operation does it support? - chaining (1 – 1) - composing (1 – 1) - combining, waiting for both result (2 – 1) - combining, triggered on the first available result (2 – 1)
56.
@JosePaumard#Devoxx #J8Async Building CompletionStagechains What kind of operation does it support? - chaining (1 – 1) - composing (1 – 1) - combining, waiting for both result (2 – 1) - combining, triggered on the first available result (2 – 1) All this gives… 36 methods!
57.
@JosePaumard#Devoxx #J8Async Building CompletionStagechains In what thread can it be executed? - In the same executor as the caller - In a new executor, passed as a parameter - Asynchronously, ie in the common fork join pool All this gives… 36 methods!
@JosePaumard#Devoxx #J8Async CompletionStage –patterns Some 2 – 1 patterns public <U, V> CompletionStage<V> thenCombineAsync (CompletionStage<U> other, BiFunction<T, U, V> function) ; public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<U> other, BiConsumer<T, U> action) ;
63.
@JosePaumard#Devoxx #J8Async CompletionStage –patterns Some 2 – 1 patterns public <U, V> CompletionStage<V> thenCombineAsync (CompletionStage<U> other, BiFunction<T, U, V> function) ; public <U> CompletionStage<Void> thenAcceptBoth (CompletionStage<U> other, BiConsumer<T, U> action) ; public CompletionStage<Void> runAfterBothAsync (CompletionStage<?> other, Runnable action, Executor executor) ;
64.
@JosePaumard#Devoxx #J8Async CompletionStage –patterns Some more 2 – 1 patterns public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<T, U> function) ;
65.
@JosePaumard#Devoxx #J8Async CompletionStage –patterns Some more 2 – 1 patterns public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<T, U> function) ; public CompletionStage<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? extends T> consumer) ;
66.
@JosePaumard#Devoxx #J8Async CompletionStage –patterns Some more 2 – 1 patterns public <U> CompletionStage<U> applyToEither (CompletionStage<? extends T> other, Function<T, U> function) ; public CompletionStage<Void> acceptEitherAsync (CompletionStage<? extends T> other, Consumer<? extends T> consumer) ; public CompletionStage<Void> runAfterEitherAsync (CompletionStage<U> other, Runnable action, Executor executor) ;
67.
@JosePaumard#Devoxx #J8Async Back toour first example So the complete pattern becomes this one 1) First we create our mocks String result = Mockito.mock(String.class); AsyncResponse response = Mockito.mock(AsyncResponse.class); Runnable train = () -> Mockito.doReturn(result).when(response).longOperation(); Runnable verify = () -> Mockito.verify(response).resume(result);
68.
@JosePaumard#Devoxx #J8Async Back toour first example So the complete pattern becomes this one 2) Then we create the call & verify Runnable callAndVerify = () -> { asyncResource.executeAsync(response).thenRun(verify); }
69.
@JosePaumard#Devoxx #J8Async Back toour first example So the complete pattern becomes this one 3) Then we create the task ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncResource asyncResource = new AsyncResource(); asyncResource.setExecutorService(executor); CompletableFuture .runAsync(train, executor) // this trains our mocks .thenRun(callAndVerify); // this verifies our mocks
70.
@JosePaumard#Devoxx #J8Async Back toour first example Since a CompletableFuture is also a Future, we can fail with a timeout if the test does not complete fast enough ExecutorService executor = Executors.newSingleThreadExecutor(); AsyncResource asyncResource = new AsyncResource(); asyncResource.setExecutorService(executor); CompletableFuture .runAsync(train, executor) // this trains our mocks .thenRun(callAndVerify) // this verifies our mocks .get(10, TimeUnit.SECONDS);
@JosePaumard#Devoxx #J8Async A secondexample Async analysis of a web page CompletableFuture.supplyAsync( () -> readPage("http://whatever.com/") ) .thenApply(page -> linkParser.getLinks(page))
73.
@JosePaumard#Devoxx #J8Async A secondexample Async analysis of a web page CompletableFuture.supplyAsync( () -> readPage("http://whatever.com/") ) .thenApply(page -> linkParser.getLinks(page)) .thenAccept( links -> displayPanel.display(links) // in the right thread! ) ;
74.
@JosePaumard#Devoxx #J8Async A secondexample Async analysis of a web page CompletableFuture.supplyAsync( () -> readPage("http://whatever.com/") ) .thenApply(page -> linkParser.getLinks(page)) .thenAcceptAsync( links -> displayPanel.display(links), executor ) ;
@JosePaumard#Devoxx #J8Async A secondexample Async analysis of a web page public interface Executor { void execute(Runnable command); } Executor executor = runnable -> SwingUtilities.invokeLater(runnable) ;
77.
@JosePaumard#Devoxx #J8Async A secondexample Async analysis of a web page CompletableFuture.supplyAsync( () -> readPage("http://whatever.com/") ) .thenApply(page -> linkParser.getLinks(page)) .thenAcceptAsync( links -> displayPanel.display(links), runnable -> SwingUtilities.invokeLater(runnable) ) ;
78.
@JosePaumard#Devoxx #J8Async A secondexample Async analysis of a web page CompletableFuture.supplyAsync( () -> readPage("http://whatever.com/") ) .thenApply(Parser::getLinks) .thenAcceptAsync( DisplayPanel::display, SwingUtilities::invokeLater ) ;
79.
@JosePaumard#Devoxx #J8Async A lastexample Async events in CDI @Inject Event<String> event ; event.fire("some event") ; // returns void public void observes(@Observes String payload) { // handle the event, called in the firing thread }
80.
@JosePaumard#Devoxx #J8Async A lastexample Async events in CDI public void observes(@Observes String payload) { // handle the event, called in the firing thread CompletableFuture.anyOf(/* some task */) ; }
81.
@JosePaumard#Devoxx #J8Async A lastexample Async events in CDI @Inject Event<String> event ; event.fireAsync("some event") ; // returns CompletionStage<Object> public void observes(@ObservesAsync String payload) { // handle the event in another thread }
82.
@JosePaumard#Devoxx #J8Async A lastexample Async events in CDI @Inject Event<String> event ; Executor executor = SwingUtilities::invokeLater event.fireAsync("some event", executor) ;
83.
@JosePaumard#Devoxx #J8Async A lastexample Async events in CDI @Inject Event<String> event ; Executor executor = SwingUtilities::invokeLater CompletionStage<Object> cs = event.fireAsync("some event", executor) ; cs.whenComplete(...); // handle the exceptions
84.
@JosePaumard#Devoxx #J8Async CompletionStage –last patterns Static methods public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) ; public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) ;
@JosePaumard#Devoxx #J8Async Exception handling Whichmeans that: - the call to isCompletedExceptionnaly() returns true - the call to get() throws an ExecutionException which cause is the root Exception CompletableFuture can handle exceptions
@JosePaumard#Devoxx #J8Async Exception handling SupposeCF30 has been created with exceptionnaly() If CF21 completes normally, then CF30 just transmits the value If it raises an exception, then CF30 handles it and generate a value for CF31 CF1 CF21 CF22 CF31 CF32 CF41 exceptionnaly() CF30
@JosePaumard#Devoxx #J8Async Conclusion We havean API for async computations in the JDK! Very rich, many methods which makes it complex Built on lambdas Gives a fine control over threads Handle chaining, composition
115.
@JosePaumard#Devoxx #J8Async Conclusion We havean API for async computations in the JDK! Very rich, many methods which makes it complex Built on lambdas Gives a fine control over threads Handle chaining, composition Very clean way of handling exceptions