UNDERSTANDING REACTIVE PROGRAMMING ANDRES ALMIRAY @AALMIRAY ANDRESALMIRAY.COM
@aalmiray
@aalmiray JCP Executive Committee Associate Seat Committer Committer JSR377 Specification Lead
@aalmiray CONCEPTS
@aalmiray STATE Imperative Programming (Procedural) var a := b + c println a Declarative Programming (Functional) (println (sum(b, c))
@aalmiray TIME Imperative Programming var a := b + c println a Reactive Programming var a := b + c a.done( v -> println v)
@aalmiray SYNC VS ASYNC Synchronous operations block the flow until they yield a result (success or error). Asynchronous operations DO NOT block the flow, rather they allow the program to continue. Results (or errors) will be handled at a later point in time, typically using functions (callbacks).
@aalmiray FUTURES AND PROMISES They describe an object that acts as mediator for a value that may be unknown at init time. The terms Future and Promise are normally used interchangeably but there’s a difference: • A Future is a read-only reference of the expected value. • A Promise is a write-once container that sets the value of the Future. https://en.wikipedia.org/wiki/Futures_and_promises
@aalmiray FUTURES AND PROMISES Futures in Java are synchronous, this can be demonstrated by the following method signatures found in java.util.concurrent.Future V get() V get(long timeout, TimeUnit unit)
@aalmiray FUTURES AND PROMISES Java 8 added a new type, CompletableFuture, that implements CompletableStage, which in turns defines the contract of a Promise, such as thenAccept(Consumer<? Super T> action) whenComplete(BiConsumer<? super T,? super Throwable> action) exceptionally(Function<Throwable,? extends T> fn) … and many others
@aalmiray FUTURES AND PROMISES JDeferred offers a different API that allows better function composition Promise<D, F, P> then(DoneCallback<D> doneCallback) Promise<D, F, P> done(DoneCallback<D> callback) Promise<D, F, P> fail(FailCallback<F> callback) Promise<D, F, P> always(AlwaysCallback<D, F> callback)
@aalmiray EXAMPLES
@aalmiray GET /orgs/${organization}/repos data as JSON https://developer.github.com/v3/
@aalmiray CODE AVAILABLE AT https://github.com/aalmiray/javatrove/
@aalmiray IMPERATIVE :’(
@aalmiray (1) PRODUCER public List<Repository> repositories(final String organization) { try { Response<List<Repository>> response = api.repositories(organization).execute(); if (response.isSuccessful()) { return response.body(); } throw new IllegalStateException(response.message()); } catch (IOException e) { throw new IllegalStateException(e); } }
@aalmiray (2) CONSUMER public class AppController { @Inject private AppModel model; @Inject private GithubAPI api; public void loadRepositories() { model.setState(RUNNING); String organization = model.getOrganization(); try { List<Repository> repositories = repositories(organization); model.getRepositories().addAll(repositories); } catch (Throwable throwable) { handleError(throwable); } finally { model.setState(READY); } } }
@aalmiray PROMISES ;-) (JAVA8)
@aalmiray (1) PRODUCER public CompletableFuture<List<Repository>> repositories(final String organization) { Supplier<List<Repository>> supplier = () -> { try { Response<List<Repository>> response = api.repositories(organization).execute(); if (response.isSuccessful()) { return response.body(); } throw new IllegalStateException(response.message()); } catch (IOException e) { throw new IllegalStateException(e); } }; return CompletableFuture.supplyAsync(supplier, executorService); }
@aalmiray (2) CONSUMER public class AppController { @Inject private AppModel model; @Inject private Github github; public void loadRepositories() { model.setState(RUNNING); github.repositories(model.getOrganization()) .thenAccept(model.getRepositories()::addAll) .exceptionally(throwable -> { handleError(throwable); return null; }) .thenAccept(result -> model.setState(READY)); } }
@aalmiray PROMISES ;-) (JDEFERRED)
@aalmiray (1) PRODUCER public Promise<Collection<Repository>, Throwable, Void> repositories(final String organization) { return deferredManager.when(() -> { Response<List<Repository>> response = api.repositories(organization).execute(); if (response.isSuccessful()) { return response.body(); } throw new IllegalStateException(response.message()); }); }
@aalmiray (2) CONSUMER public class AppController { @Inject private AppModel model; @Inject private Github github; public void loadRepositories() { model.setState(RUNNING); github.repositories(model.getOrganization()) .done(model.getRepositories()::addAll) .fail(this::handleError) .always((state, resolved, rejected) -> model.setState(READY)); } }
@aalmiray REACTIVE :D (RXJAVA)
@aalmiray (1) PRODUCER public Observable<Repository> repositories(String organization) { Observable<Response<List<Repository>>> observable = api.repositories(organization); return observable.flatMap(response -> { if (response.isSuccessful()) { return Observable.fromIterable(response.body()); } return Observable.error(new HttpResponseException( response.code(), response.message())); }); }
@aalmiray (2) CONSUMER public class AppController { @Inject private AppModel model; @Inject private Github github; public void load() { Observable<Repository> observable = github.repositories(model.getOrganization()); observable .timeout(10, TimeUnit.SECONDS) .doOnSubscribe(disposable -> model.setState(RUNNING)) .doOnTerminate(() -> model.setState(READY)) .doOnError(this::handleError) .subscribeOn(Schedulers.io()) .subscribe(model.getRepositories()::add)); } }
@aalmiray GET /orgs/${organization}/repos data as JSON data as JSON https://developer.github.com/v3/ GET /organizations/1/repos?page=2
@aalmiray MULTPLE PAGES public Observable<Repository> repositories(String organization) { return paginatedObservable( () -> { return api.repositories(organization); }, (Links links) -> { return api.repositoriesPaginate(links.next()); }); }
@aalmiray MULTIPLE PAGES <T> Observable<T> paginatedObservable(FirstPageSupplier<T> firstPage, NextPageSupplier<T> nextPage) { return processPage(nextPage, firstPage.get()); } <T> Observable<T> processPage(NextPageSupplier<T> supplier, Observable<Response<List<T>>> items) { return items.flatMap(response -> { Links links = Links.of(response.headers().get("Link")); Observable<T> currentPage = Observable.from(response.body()); if (links.hasNext()) { return currentPage.concatWith( processPage(supplier, supplier.get(links))); } return currentPage; }); }
@aalmiray HTTP://REACTIVEX.IO/ Reactive Programming API, provides observable data flows. Multiple implementations in more than 20 programming languages Java adn Javascript implement the http://www.reactive- streams.org/ specification
@aalmiray HTTP://REACTIVEX.IO/ Java implementations: https://github.com/ReactiveX/RxJava Created by Netflix https://projectreactor.io Sponsored by Pivotal (Spring framework)
@aalmiray DATA STREAMS
@aalmiray DATA STREAMS A sequence of values calculated over time. Values are emitted when ready; they are computed without blocking consumers. Consumers listen to changes in the data stream and react to said changes (hence the name Reactive Programming). Push vs pull model.
@aalmiray OBSERVABLE/OBSERVER Data streams are of type Observable while consumer are of type Observer. The Observable type exposes multiple operations that allow value composition, combinations, filtering, and other value transformations. NOTE: many operations from RxJava/Reactor generate a new Observable (decorator pattern)
@aalmiray OPERATIONS (A SMALL SAMPLE OF)
@aalmiray HTTP://RXMARBLES.COM
@aalmiray HTTP://RXMARBLES.COM
@aalmiray HTTP://RXMARBLES.COM
@aalmiray HTTP://RXMARBLES.COM
@aalmiray HTTP://REACTIVEX.IO/RXJAVA/2.X/JAVADOC/INDEX.HTML
@aalmiray FUNCTIONAL PROGRAMMING
@aalmiray FUNCTIONS Functions should follow two concepts: • Do not produce side effects during invocation. • Given the same input they must produce the same output.
@aalmiray IMMUTABILITY Immutable data structures and/or data containers (POJOs) complement Functional Programming in a natural way. They allow data sharing with multiple consumers and/or functions without fear of changes affecting the original data. They help reduce synchronization points.
@aalmiray OTHER OPTIONS http://vertx.io/ Toolkit for reactive applications Based on Netty. https://grpc.io/ RPC framework. Supports data streaming both ways
@aalmiray RXGRPC https://github.com/salesforce/reactive-grpc Combines gRPC with Reactive Streams Encapsulates gRPC’s API Producers/Consumers only see Rx API
@aalmiray THE FUTURE
@aalmiray JAVA 9+ FLOW Java 9 implements the Reactive Streams in its own way with java.util.concurrent.Flow This API allows combining streams from different providers (RxJava, Reactor, etc). Additions to the CompletableStage/CompletableFuture API.
@aalmiray REACTIVE SPRING (WEB) @GetMapping("/accounts/{id}/alerts") public Flux<Alert> getAccountAlerts(@PathVariable Long id) { return this.repository.getAccount(id) .flatMap(account -> this.webClient .perform(get("/alerts/{key}", account.getKey())) .extract(bodyStream(Alert.class))); } https://spring.io/blog/2016/07/28/reactive-programming-with-spring-5-0-m1
@aalmiray REACTIVE SPRING (DATA+WEB) @RestController class PersonController { private final PersonRepository people; public PersonController(PersonRepository people) { this.people = people; } @GetMapping("/people") Flux<String> namesByLastname(@RequestParam Mono<String> lastname) { Flux<Person> result = repository.findByLastname(lastname); return result.map(it -> it.getFullName()); } } https://spring.io/blog/2016/11/28/going-reactive-with-spring-data
@aalmiray RESOURCES http://download.java.net/java/jdk9/docs/api/java/util/concurrent/Flow.ht ml http://docs.oracle.com/javase/8/docs/api/java/util/concurrent/Completab leFuture.html http://jdeferred.org/ http://andresalmiray.com/articles/jdeferred-simple-handling-of- promises-and-futures/ http://andresalmiray.com/articles/testing-rxjava2/ https://www.infoq.com/articles/rxjava2-by-example
@aalmiray REACTIVE PROGRAMMING IMPLIES AND PARADIGM CHANGE!
@aalmiray HTTP://ANDRESALMIRAY.COM/NEWSLETTER HTTP://ANDRESALMIRAY.COM/EDITORIAL
@aalmiray
THANK YOU! ANDRES ALMIRAY @AALMIRAY ANDRESALMIRAY.COM

Understanding Reactive Programming