Reactive Thinking in Java
 with RxJava2 Yakov Fain, Farata Systems
 
 @yfain
About myself • Solutions Architect at Farata Systems • Java Champion • Latest book:
 “Angular 2 Development with TypeScript”

The problem with not being reactive int a1 = 2;
 int b1 = 4;
 
 int c1 = a1 + b1; // c1 = 6
 

int a1 = 2;
 int b1 = 4;
 
 int c1 = a1 + b1; // c1 = 6
 
 a1 = 55; // c1 = 6;
 b1 = 20; // c1 = 6;
 The problem with not being reactive
A famous reactive solution
But we face more challenges • An async request to a server failed later • You need to chain multiple async calls • An async response came back, but the user already moved to a different view • An async request was executed on a thread A, but the UI must updated on a main thread • Allocating a thread to each async request is expensive • How to avoid blocking code in a multi-threaded app?
and even more challenges • The same stream has to be processed by several consumers • The producer pushes the data that should flow through several composable functions • Mobile device. Slow connection. An Http request is pending, but the user made another one. How to kill the pending request? • The publisher generates data faster than a consumer can handle
Backpressure Publisher Subscriber Publisher generates more data than subscriber can (or want) process
Reactive Apps • The data is processed as streams and not via iterating over the in-memory data • Message-driven: components communicate via direct notifications • A stream can be handled by one or more composable operators (functions). • Resilient: stay responsive in case of failures • The data flows through your app’s algorithm • Hide complexity of multi-threading
Reactive Streams • Reactive Streams is a spec for async stream processing with non- blocking backpressure
 
 http://www.reactive-streams.org
 • Reactive Streams interfaces for JVM 
 
 https://github.com/reactive-streams/reactive-streams-jvm • Reactive Streams-based products: 
 
 RxJava2, Java 9 Flow APIs, Reactor 3, Akka, MongoDB, Vert.x …
Reactive Streams: Java interfaces 1 2 3 4 backpressure
 support
Examples of backpressure • The stock price may change hundreds times a second, but the user’s display should change once a second. • Android accelerometer produces 50 readings a second, but your app should react to one signal per second. • Iteration through a JDBC result set (rxjava-jdbc; rxjava2-jdbc) • A user drags the mouse to draw a curve. Can’t apply back pressure.
Handling backpressure Publisher Subscriber request(1) request(3) … Even if the publisher is slow and the data is not be available, 
 the request() doesn’t block. onNext(value1) onNext(value2) onNext(value3) onNext(value4)
Rx libraries • RxJava (end of life: March 2018)
 RxAndroid, RxJavaFX, RxSwing • RxJava2 • Other Rx libraries:
 Rx.NET, RxCpp, RxJS, Rx.rb, Rx.py, RxSwift, RxScala, RxPHP http://reactivex.io
Main RxJava2 players • Observable or Flowable - producers of data • Observer or Subscriber - consumers of data • Subject or Processor - implements producer and consumer • Operator - en-route data transformation • Scheduler - multi-threading support
Main Publishers and Subscribers in RxJava2 Observable
 no backpressure support 
 public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Publisher Subscriber Not from 
 Reactive Streams
Observable
 no backpressure support Flowable
 with backpressure support public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } public interface Subscriber<T> {
 void onSubscribe(Subscription var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Not from 
 Reactive Streams From 
 Reactive Streams Main Publishers and Subscribers in RxJava2 Publisher Subscriber
beers.forEach(beer -> {
 if ("USA".equals(beer.country)){
 americanBeers.add(beer);
 }
 }); Java Iterable: a pull
beers.stream()
 .skip(1)
 .limit(3)
 .filter(beer -> "USA".equals(beer.country))
 .map(beer -> beer.name + ": $" + beer.price) 
 .forEach(beer -> System.out.println(beer));
 Java 8 Stream API: a pull
A pull with a tool is still a pull
Observable<Beer> observableBeer = Observable.create(/* data source */); observableBeer
 .skip(1)
 .take(3)
 .filter(beer -> "USA".equals(beer.country))
 .map(beer -> beer.name + ": $" + beer.price)
 .subscribe(
 beer -> System.out.println(beer),
 
 err -> System.out.println(err),
 
 () -> System.out.println("Streaming is complete”),
 
 disposable -> System.out.println( 
 "Someone just subscribed to the beer stream!!!”)
 );
 ); Rx Observable: a push O b s e r v e r Subscribtion
Adding RxJava2 to your project Or find rxjava2 and reactive-streams jars on search.maven.org <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>x.y.z</version> </dependency> Maven:
Creating an Observable • Observable.create() • Observable.fromArray() • Observable.fromIterable() • Observable.fromCallable() • Observable.fromFuture() • Observable.range() • Observable.just()
Synchronous push List<Beer> beerStock = new ArrayList<>(); … Observable.create(subscriber -> {
 
 int totalBeers = beerStock.size();
 for (int i = 0; i < totalBeers; i++) {
 
 subscriber.onNext(beerStock.get(i));
 }
 
 subscriber.onComplete();
… Observable.create(subscriber -> {
 
 myHttpClient.getBeers(new Callback(){
 public void onResponse(Response res){
 subscriber.onNext(res.body().string()); subscriber.onComplete(); 
 } public void onFailure (IOException e){ subscriber.onError(e); } } }); Asynchronous push
Creating an Observer and subscribing Observable<Beer> beerData = BeerServer.getData(); // returns Observable
 
 Observer beerObserver = new Observer<Beer>() {
 
 public void onSubscribe(Disposable d) {
 System.out.println( " !!! Someone just subscribed to the bear stream!!! ");
 
 // If the subscriber is less than 21 year old, cancel subscription
 // d.dispose();
 }
 
 public void onNext(Beer beer) {
 System.out.println(beer);
 }
 
 public void onError(Throwable throwable) {
 System.err.println("In Observer.onError(): " + throwable.getMessage());
 }
 
 public void onComplete() {
 System.out.println("*** The stream is over ***");
 }
 };
 
 beerData
 .subscribe(beerObserver); // Streaming starts here
Demo 
 BeerClient
Specialized Observables • Single - Emits a exactly one item or sends an error • Completable - Emits either complete or error - no data
 Any response is better than no response! • Maybe - Either emits exactly one item, or completes with 
 no items, or sends an error
Controlling the flow with request() request(1); request(1);
Flowables and backpressure strategies • BackpressureStrategy.BUFFER - process what you can; put the rest in the buffer 
 until the next request. • BackpressureStrategy.DROP - process what you can; ignore the rest until the 
 next request. • BackpressureStrategy.LATEST - process what you can; ignore the rest until the 
 next request, but cache the latest element
 • BackpressureStrategy.MISSING - don’t apply backpressure; if consumer can’t keep
 up, it may throw MissingBackpressureException or IllegalStateException • BackpressureStrategy.ERROR - apply backpressure; if consumer can’t keep up,
 it throws MissingBackpressureException
The BUFFER strategy BackpressureStrategy.BUFFER
BackpressureStrategy.DROP The DROP strategy
BackpressureStrategy.LATEST The LATEST strategy
Creating a Flowable • Flowable.create() • Flowable.fromArray() • Flowable.fromIterable() • Flowable.fromCallable() • Flowable.empty() • Flowable.range() • Flowable.just()
Creating a Flowable • Flowable.create() • Flowable.fromArray() • Flowable.fromIterable() • Flowable.fromCallable() • Flowable.fromFuture() • Flowable.empty() • Flowable.range() • Flowable.just() myObservable
 .toFlowable(BackpressureStrategy.BUFFER) Flowable<Beer> myFlowable
 .create(beerEmitter ->{…},
 BackpressureStrategy.BUFFER) Create Convert
Requesting data from Flowable public class FlowableRange {
 
 static DisposableSubscriber<Integer> subscriber;
 
 public static void main(String[] args) {
 
 subscriber = new DisposableSubscriber<Integer>() {
 
 public void onStart() {
 request(5);
 
 while (true){ // Emulate some 1-sec processing
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 request(1);
 }
 }
 
 public void onNext(Integer t) {
 System.out.println("processing "+ t);
 if (t==8) {
 subscriber.dispose();
 }
 }
 
 public void onError(Throwable thr) {
 System.err.println("In onError(): " + thr.getMessage()); 
 }
 
 public void onComplete() {
 System.out.println("Done");
 }
 };
 
 Flowable.range(1, 10)
 .subscribe(subscriber);
 }
 }
Demos 
 1. FlowableRange
 2. BeerClientFlowable
FP: a pure function • Produces no side effects • The same input always results in the same output • Doesn’t modify the input • Doesn’t rely on the external state Sharing state require locks, may cause race conditions, and complicates programming
FP: a higher-order function • Can take one or more functions as argument(s) • Can return a function
An Operator Observable Observable A transforming
 function
An Operator Observable Observable A transforming
 function observableBeer
 .filter(b -> "USA".equals(b.country))
An Operator Observable Observable A transforming
 function A higher-order function observableBeer
 .filter(b -> "USA".equals(b.country)) A pure function
map
filter
RX: the data moves across your algorithm
Observable Operator Observable Operator Observable Operator Observable Operator
QUIZ: What value(s) this observable will emit? Observable
 .just(5, 9, 10)
 .filter(i -> i % 3 > 0)
 .map(n -> "$" + n * 10)
 .filter(s -> s.length() < 4);

 Observable
 .just(5, 9, 10)
 .filter(i -> i % 3 > 0)
 .map(n -> "$" + n * 10)
 .filter(s -> s.length() < 4) .subscribe(System.out::println); QUIZ: What value(s) this observable will emit?
Functions with side effects • doOnNext() • doOnError() • doOnComplete() • doOnEach() • doOnSubscribe() • doOnDispose() Affect environment outside the function.
Error reporting Observer Observable onNext() onError() onComplete() When the Observable or Flowable throws an exception it still invokes Observer.onError() or Subscriber.onError()
Error-handling operators • onError() kills the subscription • retryWhen() - intercept and analyze the error; resubscribe • retryUntil() - retry until a certain condition is true • onErrorResumeNext() - used for failover to another Observable
Demo 
 BeerClientWithFailover
Composing observables
concat: combining observables of the same type in order
merge: combining observables of the same type
zip: combining observables of different types
flatMap
.flatMap()Observable Can spawn multiple threads
Demo 
 composingObservables/ObservableDrinks
switchMap
Demo 
 Angular client, weather app
Schedulers
Concurrency with Schedulers • subscribeOn(strategy) - run Observable in a separate thread: 
 Operations with side effects like files I/O;
 Don’t hold off the UI thread
 • observeOn(strategy) - run Observer in a separate thread,
 Update Swing UI on the Event Dispatch Thread
 Update Android UI on the Main thread
Multi-threading strategies • Schedulers.computation() - for computations: # of threads <= # of cores • Schedulers.io() - for long running communications; backed by a thread pool • Schedulers.newThread() - new thread for each unit of work • Schedulers.from(Executor) - a wrapper for Java Executor • Scedulers.trampoline() - queues the work on the current thread • AndroidSchedulers.mainThread() - handle data on the main thread (RxAndroid)
Switching threads Operator1() Operator2() ObserveOn() Observable Subscriber Thread 1 Thread 2 subscribeOn() observeOn()
Multi-threaded processing with flatMap() Operator1() Operator2() flatMap() Observable Subscriber Thread 1 Observable/Thr2 Observable/Thr3 Observable/ThrN With flatMap() it’s easy to spawn a different thread to each observable …
Turning a value into
 an observable Observable.range(1, 1000) .flatMap(n->Observable.just(n)
 .subscribeOn(Schedulers.computation()))
 .map(n->n*2)
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(myView.update()); Subscribing to each observable on the computation thread Displaying the result 
 on the main thread
Demo 
 schedulers/SubscribeOnObserveOn
 ParallelStreamsRange
 ParallelStreams
Parallel operations with ParallelFlowabe • ParallelFlowable allows parallel execution of a few operators • The source sequence is dispatched into N parallel “rails” • More efficient forking and joining than with flatMap()
 runOn() —-> sequential() • Each rail can spawn multiple async threads with Schedulers
Parallel operations with ParallelFlowabe int numberOfRails = 4; // can query #processors with parallelism()
 
 ParallelFlowable
 .from(Flowable.range(1, 10), numberOfRails)
 .runOn(Schedulers.computation())
 .map(i -> i * i)
 .filter(i -> i % 3 == 0)
 .sequential()
 .subscribe(System.out::println); Demo: ParallelFlowableRange
Types of Observables Hot Cold Push Produce the steam even
 if no-one cares r Produce the stream when 
 someone asks for it ✔
Hot Observables • Mouse-generated events are emitted even if there are no subscribers • Button clicks • Stock prices are being published on a server socket regardless if any client is connected • A sensor in a mobile device sends signals even if no apps are handling them
Making observables hot
Using publish() and connect() ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
 .interval(1, TimeUnit.SECONDS) // generate seq numbers every second
 .publish(); // make it hot
 
 numbers.connect(); // creates an internal subscriber to start producing data
 numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));
 
 Thread.sleep(3000);
 
 numbers.subscribe(n ->System.out.println(" Second subscriber: "+ n ));

Demo 
 HotObservable
Summary • Observable: no backpressue; no reactive streams spec • Flowable: backpressure; reactive streams spec • Operators can be chained • flatmap() used for handling observable of observables • Schedulers support multi-threading • subscribeOn()/observeOn() - switching between threads • ParallelFlowable - initiate parallel processing • Observables can be hot or cold
Thank you! • Code samples: 
 https://github.com/yfain/rxjava2 • Our company: faratasystems.com • My blog: yakovfain.com • Twitter: @yfain


Reactive Thinking in Java with RxJava2

  • 1.
    Reactive Thinking inJava
 with RxJava2 Yakov Fain, Farata Systems
 
 @yfain
  • 2.
    About myself • SolutionsArchitect at Farata Systems • Java Champion • Latest book:
 “Angular 2 Development with TypeScript”

  • 3.
    The problem withnot being reactive int a1 = 2;
 int b1 = 4;
 
 int c1 = a1 + b1; // c1 = 6
 

  • 4.
    int a1 =2;
 int b1 = 4;
 
 int c1 = a1 + b1; // c1 = 6
 
 a1 = 55; // c1 = 6;
 b1 = 20; // c1 = 6;
 The problem with not being reactive
  • 5.
  • 6.
    But we facemore challenges • An async request to a server failed later • You need to chain multiple async calls • An async response came back, but the user already moved to a different view • An async request was executed on a thread A, but the UI must updated on a main thread • Allocating a thread to each async request is expensive • How to avoid blocking code in a multi-threaded app?
  • 7.
    and even morechallenges • The same stream has to be processed by several consumers • The producer pushes the data that should flow through several composable functions • Mobile device. Slow connection. An Http request is pending, but the user made another one. How to kill the pending request? • The publisher generates data faster than a consumer can handle
  • 8.
    Backpressure Publisher Subscriber Publisher generatesmore data than subscriber can (or want) process
  • 9.
    Reactive Apps • Thedata is processed as streams and not via iterating over the in-memory data • Message-driven: components communicate via direct notifications • A stream can be handled by one or more composable operators (functions). • Resilient: stay responsive in case of failures • The data flows through your app’s algorithm • Hide complexity of multi-threading
  • 10.
    Reactive Streams • ReactiveStreams is a spec for async stream processing with non- blocking backpressure
 
 http://www.reactive-streams.org
 • Reactive Streams interfaces for JVM 
 
 https://github.com/reactive-streams/reactive-streams-jvm • Reactive Streams-based products: 
 
 RxJava2, Java 9 Flow APIs, Reactor 3, Akka, MongoDB, Vert.x …
  • 11.
    Reactive Streams: Javainterfaces 1 2 3 4 backpressure
 support
  • 12.
    Examples of backpressure •The stock price may change hundreds times a second, but the user’s display should change once a second. • Android accelerometer produces 50 readings a second, but your app should react to one signal per second. • Iteration through a JDBC result set (rxjava-jdbc; rxjava2-jdbc) • A user drags the mouse to draw a curve. Can’t apply back pressure.
  • 13.
    Handling backpressure Publisher Subscriber request(1) request(3) … Evenif the publisher is slow and the data is not be available, 
 the request() doesn’t block. onNext(value1) onNext(value2) onNext(value3) onNext(value4)
  • 14.
    Rx libraries • RxJava(end of life: March 2018)
 RxAndroid, RxJavaFX, RxSwing • RxJava2 • Other Rx libraries:
 Rx.NET, RxCpp, RxJS, Rx.rb, Rx.py, RxSwift, RxScala, RxPHP http://reactivex.io
  • 15.
    Main RxJava2 players •Observable or Flowable - producers of data • Observer or Subscriber - consumers of data • Subject or Processor - implements producer and consumer • Operator - en-route data transformation • Scheduler - multi-threading support
  • 16.
    Main Publishers andSubscribers in RxJava2 Observable
 no backpressure support 
 public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Publisher Subscriber Not from 
 Reactive Streams
  • 17.
    Observable
 no backpressure support Flowable
 withbackpressure support public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } public interface Subscriber<T> {
 void onSubscribe(Subscription var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Not from 
 Reactive Streams From 
 Reactive Streams Main Publishers and Subscribers in RxJava2 Publisher Subscriber
  • 18.
    beers.forEach(beer -> {
 if("USA".equals(beer.country)){
 americanBeers.add(beer);
 }
 }); Java Iterable: a pull
  • 19.
    beers.stream()
 .skip(1)
 .limit(3)
 .filter(beer -> "USA".equals(beer.country))
 .map(beer-> beer.name + ": $" + beer.price) 
 .forEach(beer -> System.out.println(beer));
 Java 8 Stream API: a pull
  • 20.
    A pull witha tool is still a pull
  • 21.
    Observable<Beer> observableBeer =Observable.create(/* data source */); observableBeer
 .skip(1)
 .take(3)
 .filter(beer -> "USA".equals(beer.country))
 .map(beer -> beer.name + ": $" + beer.price)
 .subscribe(
 beer -> System.out.println(beer),
 
 err -> System.out.println(err),
 
 () -> System.out.println("Streaming is complete”),
 
 disposable -> System.out.println( 
 "Someone just subscribed to the beer stream!!!”)
 );
 ); Rx Observable: a push O b s e r v e r Subscribtion
  • 22.
    Adding RxJava2 toyour project Or find rxjava2 and reactive-streams jars on search.maven.org <dependency> <groupId>io.reactivex.rxjava2</groupId> <artifactId>rxjava</artifactId> <version>x.y.z</version> </dependency> Maven:
  • 23.
    Creating an Observable •Observable.create() • Observable.fromArray() • Observable.fromIterable() • Observable.fromCallable() • Observable.fromFuture() • Observable.range() • Observable.just()
  • 24.
    Synchronous push List<Beer> beerStock= new ArrayList<>(); … Observable.create(subscriber -> {
 
 int totalBeers = beerStock.size();
 for (int i = 0; i < totalBeers; i++) {
 
 subscriber.onNext(beerStock.get(i));
 }
 
 subscriber.onComplete();
  • 25.
    … Observable.create(subscriber -> {
 
 myHttpClient.getBeers(newCallback(){
 public void onResponse(Response res){
 subscriber.onNext(res.body().string()); subscriber.onComplete(); 
 } public void onFailure (IOException e){ subscriber.onError(e); } } }); Asynchronous push
  • 26.
    Creating an Observerand subscribing Observable<Beer> beerData = BeerServer.getData(); // returns Observable
 
 Observer beerObserver = new Observer<Beer>() {
 
 public void onSubscribe(Disposable d) {
 System.out.println( " !!! Someone just subscribed to the bear stream!!! ");
 
 // If the subscriber is less than 21 year old, cancel subscription
 // d.dispose();
 }
 
 public void onNext(Beer beer) {
 System.out.println(beer);
 }
 
 public void onError(Throwable throwable) {
 System.err.println("In Observer.onError(): " + throwable.getMessage());
 }
 
 public void onComplete() {
 System.out.println("*** The stream is over ***");
 }
 };
 
 beerData
 .subscribe(beerObserver); // Streaming starts here
  • 27.
  • 28.
    Specialized Observables • Single- Emits a exactly one item or sends an error • Completable - Emits either complete or error - no data
 Any response is better than no response! • Maybe - Either emits exactly one item, or completes with 
 no items, or sends an error
  • 29.
    Controlling the flowwith request() request(1); request(1);
  • 30.
    Flowables and backpressurestrategies • BackpressureStrategy.BUFFER - process what you can; put the rest in the buffer 
 until the next request. • BackpressureStrategy.DROP - process what you can; ignore the rest until the 
 next request. • BackpressureStrategy.LATEST - process what you can; ignore the rest until the 
 next request, but cache the latest element
 • BackpressureStrategy.MISSING - don’t apply backpressure; if consumer can’t keep
 up, it may throw MissingBackpressureException or IllegalStateException • BackpressureStrategy.ERROR - apply backpressure; if consumer can’t keep up,
 it throws MissingBackpressureException
  • 31.
  • 32.
  • 33.
  • 34.
    Creating a Flowable •Flowable.create() • Flowable.fromArray() • Flowable.fromIterable() • Flowable.fromCallable() • Flowable.empty() • Flowable.range() • Flowable.just()
  • 35.
    Creating a Flowable •Flowable.create() • Flowable.fromArray() • Flowable.fromIterable() • Flowable.fromCallable() • Flowable.fromFuture() • Flowable.empty() • Flowable.range() • Flowable.just() myObservable
 .toFlowable(BackpressureStrategy.BUFFER) Flowable<Beer> myFlowable
 .create(beerEmitter ->{…},
 BackpressureStrategy.BUFFER) Create Convert
  • 36.
    Requesting data fromFlowable public class FlowableRange {
 
 static DisposableSubscriber<Integer> subscriber;
 
 public static void main(String[] args) {
 
 subscriber = new DisposableSubscriber<Integer>() {
 
 public void onStart() {
 request(5);
 
 while (true){ // Emulate some 1-sec processing
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 request(1);
 }
 }
 
 public void onNext(Integer t) {
 System.out.println("processing "+ t);
 if (t==8) {
 subscriber.dispose();
 }
 }
 
 public void onError(Throwable thr) {
 System.err.println("In onError(): " + thr.getMessage()); 
 }
 
 public void onComplete() {
 System.out.println("Done");
 }
 };
 
 Flowable.range(1, 10)
 .subscribe(subscriber);
 }
 }
  • 37.
  • 38.
    FP: a purefunction • Produces no side effects • The same input always results in the same output • Doesn’t modify the input • Doesn’t rely on the external state Sharing state require locks, may cause race conditions, and complicates programming
  • 39.
    FP: a higher-orderfunction • Can take one or more functions as argument(s) • Can return a function
  • 40.
    An Operator Observable Observable Atransforming
 function
  • 41.
    An Operator Observable Observable Atransforming
 function observableBeer
 .filter(b -> "USA".equals(b.country))
  • 42.
    An Operator Observable Observable Atransforming
 function A higher-order function observableBeer
 .filter(b -> "USA".equals(b.country)) A pure function
  • 43.
  • 44.
  • 45.
    RX: the datamoves across your algorithm
  • 46.
  • 47.
    QUIZ: What value(s)this observable will emit? Observable
 .just(5, 9, 10)
 .filter(i -> i % 3 > 0)
 .map(n -> "$" + n * 10)
 .filter(s -> s.length() < 4);
  • 48.
    
 Observable
 .just(5, 9, 10)
 .filter(i-> i % 3 > 0)
 .map(n -> "$" + n * 10)
 .filter(s -> s.length() < 4) .subscribe(System.out::println); QUIZ: What value(s) this observable will emit?
  • 49.
    Functions with sideeffects • doOnNext() • doOnError() • doOnComplete() • doOnEach() • doOnSubscribe() • doOnDispose() Affect environment outside the function.
  • 50.
    Error reporting Observer Observable onNext() onError() onComplete() Whenthe Observable or Flowable throws an exception it still invokes Observer.onError() or Subscriber.onError()
  • 51.
    Error-handling operators • onError()kills the subscription • retryWhen() - intercept and analyze the error; resubscribe • retryUntil() - retry until a certain condition is true • onErrorResumeNext() - used for failover to another Observable
  • 52.
  • 53.
  • 54.
    concat: combining observablesof the same type in order
  • 55.
  • 56.
    zip: combining observablesof different types
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
    Concurrency with Schedulers •subscribeOn(strategy) - run Observable in a separate thread: 
 Operations with side effects like files I/O;
 Don’t hold off the UI thread
 • observeOn(strategy) - run Observer in a separate thread,
 Update Swing UI on the Event Dispatch Thread
 Update Android UI on the Main thread
  • 64.
    Multi-threading strategies • Schedulers.computation()- for computations: # of threads <= # of cores • Schedulers.io() - for long running communications; backed by a thread pool • Schedulers.newThread() - new thread for each unit of work • Schedulers.from(Executor) - a wrapper for Java Executor • Scedulers.trampoline() - queues the work on the current thread • AndroidSchedulers.mainThread() - handle data on the main thread (RxAndroid)
  • 65.
    Switching threads Operator1() Operator2()ObserveOn() Observable Subscriber Thread 1 Thread 2 subscribeOn() observeOn()
  • 66.
    Multi-threaded processing withflatMap() Operator1() Operator2() flatMap() Observable Subscriber Thread 1 Observable/Thr2 Observable/Thr3 Observable/ThrN With flatMap() it’s easy to spawn a different thread to each observable …
  • 67.
    Turning a valueinto
 an observable Observable.range(1, 1000) .flatMap(n->Observable.just(n)
 .subscribeOn(Schedulers.computation()))
 .map(n->n*2)
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(myView.update()); Subscribing to each observable on the computation thread Displaying the result 
 on the main thread
  • 68.
  • 69.
    Parallel operations withParallelFlowabe • ParallelFlowable allows parallel execution of a few operators • The source sequence is dispatched into N parallel “rails” • More efficient forking and joining than with flatMap()
 runOn() —-> sequential() • Each rail can spawn multiple async threads with Schedulers
  • 70.
    Parallel operations withParallelFlowabe int numberOfRails = 4; // can query #processors with parallelism()
 
 ParallelFlowable
 .from(Flowable.range(1, 10), numberOfRails)
 .runOn(Schedulers.computation())
 .map(i -> i * i)
 .filter(i -> i % 3 == 0)
 .sequential()
 .subscribe(System.out::println); Demo: ParallelFlowableRange
  • 71.
    Types of Observables HotCold Push Produce the steam even
 if no-one cares r Produce the stream when 
 someone asks for it ✔
  • 72.
    Hot Observables • Mouse-generatedevents are emitted even if there are no subscribers • Button clicks • Stock prices are being published on a server socket regardless if any client is connected • A sensor in a mobile device sends signals even if no apps are handling them
  • 73.
  • 74.
    Using publish() andconnect() ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
 .interval(1, TimeUnit.SECONDS) // generate seq numbers every second
 .publish(); // make it hot
 
 numbers.connect(); // creates an internal subscriber to start producing data
 numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));
 
 Thread.sleep(3000);
 
 numbers.subscribe(n ->System.out.println(" Second subscriber: "+ n ));

  • 75.
  • 76.
    Summary • Observable: nobackpressue; no reactive streams spec • Flowable: backpressure; reactive streams spec • Operators can be chained • flatmap() used for handling observable of observables • Schedulers support multi-threading • subscribeOn()/observeOn() - switching between threads • ParallelFlowable - initiate parallel processing • Observables can be hot or cold
  • 77.
    Thank you! • Codesamples: 
 https://github.com/yfain/rxjava2 • Our company: faratasystems.com • My blog: yakovfain.com • Twitter: @yfain