REACTIVE PROGRAMMING WITH NETBEANS AND JAVA 8 Stefan Reuter
22016-05-31 STEFAN REUTER @StefanReuter ● IT Architect ● Trainer and Consultant for trion development GmbH – www.trion.de
32016-05-31 trion www.trion.de ● Professional services ● Architecture ● Consulting ● Development ● Training
42016-05-31 reactive programming is programming with asynchronous data streams
52016-05-31 AGENDA Subscribing Integrating Existing Code Hot vs. Cold Concurrency Item Creating Introduction Composable Functions
62016-05-31 AGENDA Subscribing Integrating Existing Code Hot vs. Cold Concurrency Item Creating IntroductionIntroductionIntroduction Composable Functions
72016-05-31 REACTIVE MANIFESTO Responsive Resilient Message Driven Elastic “[…] Systems built as Reactive Systems are more flexible, loosely-coupled and scalable. This makes them easier to develop and amenable to change. They are significantly more tolerant of failure and when failure does occur they meet it with elegance rather than disaster. Reactive Systems are highly responsive, giving users effective interactive feedback. […]” From: Reactive Manifesto, Version 2, September 2014
82016-05-31 REACTIVE SCENARIOS I/O Push Events User Events  Mouse movement and clicks  Keyboard typing  Changing GPS signals as a user moves with a device  Touch events  Latency-bound I/O events from disk  Data sent or received via network  Distributed and cloud-based systems  System events received from a server  Signals received from hardware  Events triggered by sensors  Data received from IoT
92016-05-31 JAVA.UTIL.ITERATOR Iterator hasNext next hasNext next ...
102016-05-31 OBSERVABLE AND SUBSCRIBER Subscriber Observable subscribe onNext* onCompleted or: onError OnNext* (OnCompleted|OnError)?
112016-05-31 ITERATOR vs. OBSERVABLE T next() onNext(T t) boolean hasNext() onCompleted() throws RuntimeException onError(Throwable e) Iterator Observable  blocking  synchronous  pull  non-blocking  asynchronous  push
122016-05-31Source: Reactive Programming with RxJava, O'Reilly 2016
132016-05-31 CREATING
142016-05-31 CREATING OBSERVABLES Edge cases Custom Pre defined values  Observable.just(T value)  Observable.from(T[] values) / from(Iterable<T> values)  Observable.range(int start, int count)  Observable.interval()  Observable.empty()  Observable.never()  Observable.error(Throwable exception)  Observable.create(OnSubscribe<T> f) Observable.create(subscriber -> { subscriber.onNext(“Hello World”); subscriber.onCompleted(); }
152016-05-31 SUBSCRIBING
162016-05-31 SUBSCRIBE (1/2)  Observables do not emit items until someone subscribes  Observables can have numerous subscribers (unlike a Java 8 stream that can only be consumed once) Observable<Integer> o = Observable.range(1,3); o.subscribe(System.out::println); o.subscribe(System.out::println);
172016-05-31 SUBSCRIBE (2/2) o.subscribe(new Observer<Integer>() { public void onNext(Integer t) { } public void onError(Throwable e) { } public void onCompleted() { } }); o.subscribe(System.out::println, Throwable::printStackTrace, this::completed); o.subscribe(i -> logger.info("{}", i));
182016-05-31 SHARING A SUBSCRIPTION ConnectableObservable<> co = o.publish(); co.subscribe(…); co.subscribe(…); co.subscribe(…); co.connect(); // or: co.refCount();  ConnectableObservable created via publish()  starts emitting items when connect() is called instead of when it is subscribed to
192016-05-31 SUBSCRIPTION  subscribing to an Observable returns a Subscription  allows client code to cancel subscription and query status Subscription subscription = tweets.subscribe(System.out::println); //... subscription.unsubscribe(); //subscription.isUnsubscribed() == true
202016-05-31 SUBSCRIBER  Subscriber is an abstract implementation of Observer and Subscription  can unsubscribe itself o.subscribe(new Subscriber<Tweet>() { @Override public void onNext(Tweet tweet) { if (tweet.getText().contains("Java")) { unsubscribe(); } } ...
212016-05-31
222016-05-31 HOT vs. COLD Hot Cold  Entirely lazy  Never starts emiting before someone subscribes  Every subscriber receives its own copy of the stream thus events are produced independently for each subscriber  Examples: Observable.just(), from(), range()  Subscribing often involves a side effect (e.g. db query)  Emits events whether subscribers are present or not  Emits same events to all subscribers  Examples: mouse movements, keyboard input, button clicks Same API for hot and cold Observables
232016-05-31 INTEGRATING EXISTING CODE
242016-05-31 WRAPPING AN EXISTING ASYNC API
252016-05-31 WRAPPING AN EXISTING SYNC API (1/3) public Observable<Hotel> listHotels() { return Observable.from( query("SELECT * FROM HOTELS") ); }  blocks until all data has been loaded  is not lazy, i.e. loads data before there is any subscription
262016-05-31 WRAPPING AN EXISTING SYNC API (2/3) public Observable<Hotel> listHotels() { return Observable.defer(() -> Observable.from( query("SELECT * FROM HOTELS") )); }  Pass a lambda to Observable.defer()  Database is no longer queried until someone subscribes
272016-05-31 WRAPPING AN EXISTING SYNC API (3/3) Observable<Booking> getBooking(String id) { return Observable.defer(() -> Observable.just( restTemplate.getForObject( "http://example.com/bookings/{id}", Booking.class, id) )); }
282016-05-31 CONVERT TO BLOCKING public Observable<Hotel> listHotels() { // ... } Observable<Hotel> o = listHotels(); Observable<List<Hotel>> hotelList = o.toList(); BlockingObservable<List<Hotel>> block = hotelList.toBlocking(); List<Hotel> people = block.single(); // short: List<Hotel> o = listHotels() .toList() .toBlocking() .single();
292016-05-31 SWITCH BETWEEN SYNC AND ASYNC Sync Blocking Pull Async Non Blocking Push Observable.defer() BlockingObservable .toList() .toBlocking() .single();
302016-05-31 CONCURRENCY
312016-05-31 USING SCHEDULERS observeOn() create() subscribeOn()  can be called any time before subscribing  the function passed to Observable.create() is executed in the thread provided by the given scheduler  controls which scheduler is used to invoke downstream subscribers occurring after observeOn()  scheduler can be passed as an additional argument to Observable.create() Scheduler sched = Schedulers.newThread(); observable.subscribeOn(sched);
322016-05-31 SCHEDULERS (1/2) io() computation() newThread()  starts a new thread each time subscribeOn() or observeOn() is called  increased latency due to thread start  usually not a good choice as threads are not reused  threads are reused  unbounded pool of threads  useful for I/O bound tasks which require very little CPU resources  limits number of threads running in parallel  default pool size == Runtime.availableProcessors()  useful for tasks thar are entirely CPU-bound, i.e. they require computational power and have no blocking code
332016-05-31 SCHEDULERS (2/2) immediate() test() from(Executor exc)  wrapper around java.util.concurrent.Executor  invokes tasks in the client thread  blocking  usually not needed because this is the default behavior  only used for testing  allows arbitrary changes to the clock for simulation ExecutorService executor = Executors.newFixedThreadPool(10); Scheduler s = Schedulers.from(executor);
342016-05-31 NOTES ON CONCURRENCY  Callbacks will be invoked from only one thread at a time, but events can be emitted from many threads  Concurrency often comes already from the source of events so explicit use of schedulers should be a last resort
352016-05-31 COMPOSABLE FUNCTIONS
362016-05-31 COMPOSABLE FUNCTIONS (1/2) Filter Combine Transform  map, flatMap  groupBy, buffer  window  ...  take, skip, last  distinct  filter  ...  concat, merge, zip, combineLatest  multicast, publish  switchOnNext  ...
372016-05-31 COMPOSABLE FUNCTIONS (2/2) Error Handling Custom Concurrency  observeOn  subscribeOn  onErrorReturn  onErrorResumeNext  retry  ...  any public class that implements the Operator interface  or a subclass like Transformer  most likely you will never need this!
382016-05-31 MAP
392016-05-31 BUFFER
402016-05-31 TWEETS PER SECOND Observable<Integer> o = tweets() .buffer(1, TimeUnit.SECONDS) .map(items -> items.size()) Observable<Integer> itemsPerSecond (Observable<?> o) { return o.buffer(1, TimeUnit.SECONDS) .map(items -> items.size()); }
412016-05-31 IMPLEMENTATIONS ON THE JVM Project Reactor 2.5 Java 9 j.u.c.Flow RxJava  Reactive Extensions (ReactiveX.io) for the JVM  Zero Dependencies  Polyglot (Scala, Groovy, Clojure and Kotlin)  RxJava 2: Java 8+ and Reactive Streams compatible  Spring Ecosystem  Reactive Streams compatible  Will become part of Spring Framework 5.0  Flow.Processor, Publisher, Subscriber and Subscription  Interfaces correspond to Reactive Streams specification
422016-05-31 Mobile Devices Android Sensors Desktop JavaScript Angular2 JavaFX BigData Cloud IoT Microservices SMALL TO LARGE SCALE Concept applies to all scales
432016-05-31  You can start using reactive program- ming right away as you can always convert back using BlockingObservable  Have a look at http://reactivex.io/  Check supporting libraries like RxNetty and RxJs SUMMARY reactive programming is a powerful option that is available today
442016-05-31 Questions?
452016-05-31 Thank you for your attention I am looking forward to answer your questions at stefan.reuter@trion.de @StefanReuter
BACKUP
472016-05-31 RxNetty HTTP CLIENT SocketAddress serverAddress = ... Charset charset = Charset.defaultCharset(); HttpClient.newClient(serverAddress) .enableWireLogging(LogLevel.DEBUG) .createGet("/api/talks") .doOnNext(resp -> logger.info(resp.toString())) .flatMap( resp -> resp.getContent() .map(bb -> bb.toString(charset)) ) .toBlocking() .forEach(logger::info);

Reactive Programming with NetBeans and Java 8