Reactive Streams and RxJava2 Yakov Fain, Farata Systems
 
 yfain
About myself • Solutions Architect at Farata Systems • Java Champion • Latest book:
 “Angular 2 Development with TypeScript”

“If I had asked people what they wanted, they would have said faster horses”
 
 Henry Ford

Synchronous vs asynchronous • Synchronous programming is straightforward • Asynchronous programming dramatically increases complexity
Challenges in async programming • Handling failed async requests • Chaining async calls • The user already closed the view • Switching threads to update UI
and more challenges • Avoiding writing blocking code in a multi-threaded app • Several consumers • Composable functions • Killing a pending Http request
Backpressure Subscriber
 Sammy A publisher generates more data than a subscriber can process Publisher
Examples of backpressure • The stock price changes 100 times a second • Accelerometer produces 50 signals a second • Iteration through a JDBC result set (rxjava-jdbc; rxjava2-jdbc) • A user drags the mouse to draw a curve. Can’t apply back pressure.
Reactive Apps • The data is processed as streams • Message-driven: notifications • Resilient: stay alive in case of failures • Data flows through your app’s algorithms • Hide complexity of multi-threading
Reactive Streams • A spec for async stream processing with non-blocking backpressure
 
 http://www.reactive-streams.org
 • Reactive Streams interfaces for JVM 
 
 https://github.com/reactive-streams/reactive-streams-jvm • Reactive Streams-based products: 
 
 RxJava2, Java 9 Flow APIs, Reactor 3, Akka Stream, MongoDB, Vert.x …
Reactive Streams: Java interfaces
Reactive Streams: Java interfaces
Reactive Streams: Java interfaces backpressure
 support
Reactive Streams: Java interfaces backpressure
 support
Main RxJava2 players • Observable or Flowable - producers of data • Observer or Subscriber - consumers of data • Subject or Processor - implements both producer and consumer • Operator - en-route data transformation • Scheduler - multi-threading support
Main Publishers and Subscribers in RxJava2 Observable
 no backpressure support 
 public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Publisher Subscriber Not from 
 Reactive Streams
Observable
 no backpressure support Flowable
 with backpressure support public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } public interface Subscriber<T> {
 void onSubscribe(Subscription var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Not from 
 Reactive Streams From 
 Reactive Streams Main publishers and subscribers in RxJava2 Publisher Subscriber
Creating an Observable • Observable.create() • Observable.fromArray() • Observable.fromIterable() • Observable.fromCallable() • Observable.fromFuture() • Observable.range() • Observable.just() Observable.create(subscriber -> { int totalBeers = beerStock.size(); for (int i = 0; i < totalBeers; i++) {
 // synchronous push subscriber.onNext(beerStock.get(i)); } subscriber.onComplete(); });
Observable<Beer> observableBeer = Observable.create(/* data source */); observableBeer
 .skip(1)
 .take(3)
 .filter(beer -> "USA".equals(beer.country))
 .map(beer -> beer.name + ": $" + beer.price)
 .subscribe(
 beer -> System.out.println(beer),
 
 err -> System.out.println(err),
 
 () -> System.out.println("Streaming is complete”),
 
 disposable -> System.out.println( 
 "Someone just subscribed to the beer stream!!!”)
 );
 ); Observable push O b s e r v e r Subscription
Creating an Observer and subscribing Observable<Beer> beerData = BeerServer.getData(); // returns an Observable
 
 Observer beerObserver = new Observer<Beer>() {
 
 public void onSubscribe(Disposable d) {
 System.out.println( " !!! Someone just subscribed to the bear stream!!! ");
 
 // If the subscriber is less than 21 year old, cancel subscription
 // d.dispose();
 }
 
 public void onNext(Beer beer) {
 System.out.println(beer);
 }
 
 public void onError(Throwable throwable) {
 System.err.println("In Observer.onError(): " + throwable.getMessage());
 }
 
 public void onComplete() {
 System.out.println("*** The stream is over ***");
 }
 };
 
 beerData
 .subscribe(beerObserver); // Streaming starts here
Subscription subscription = Observable.create(new Observable.OnSubscribe<Response>() {
 OkHttpClient client = new OkHttpClient();
 
 @Override public void call(Subscriber<? super Response> subscriber) { // invoked on subscription try { Response response = client.newCall( // prepare the call for future execution new Request.Builder().url(“http://localhost:8080/beers“) .build()) .execute(); // use enqueue() for async requests 
 subscriber.onNext(response);
 
 subscriber.onComplete(); 
 if (!response.isSuccessful()) { subscriber.onError(new Exception(“Can’t get beers”)); } } catch (IOException e) { subscriber.onError(e); } } })
 .subscribe(...); // pass Observer; use observeOn/SubscribeOn to switch threads Pushing HTTP responses https://square.github.io/okhttp
Controlling the flow with request() request(1); request(1);
Handling backpressure Publisher Subscriber request(1) request(3) … request() is non-blocking onNext(value1) onNext(value2) onNext(value3) onNext(value4)
BackpressureStrategy.BUFFER BackpressureStrategy.BUFFER
BackpressureStrategy.DROP BackpressureStrategy.Drop
BackpressureStrategy.LATEST BackpressureStrategy.Latest
Creating a Flowable • Flowable.create() • Flowable.fromArray() • Flowable.fromIterable() • Flowable.fromCallable() • Flowable.empty() • Flowable.range() • Flowable.just()
Flowable.create() and Observable.toFlowable() myObservable
 .toFlowable(BackpressureStrategy.BUFFER); Flowable<Beer> myFlowable
 .create(beerEmitter ->{…},
 BackpressureStrategy.BUFFER); Create Convert from Observable
Requesting data from Flowable public class FlowableRange {
 
 static DisposableSubscriber<Integer> subscriber;
 
 public static void main(String[] args) {
 
 subscriber = new DisposableSubscriber<Integer>() {
 
 public void onStart() {
 request(5);
 
 while (true){ // Emulate 1-sec processing
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 request(1);
 }
 }
 
 public void onNext(Integer t) {
 System.out.println("processing "+ t);
 if (t==8) { // just to demo unsubscribing
 subscriber.dispose();
 }
 }
 
 public void onError(Throwable thr) {
 System.err.println("In onError(): " + thr.getMessage()); 
 }
 
 public void onComplete() {
 System.out.println("Done");
 }
 };
 
 Flowable.range(1, 10)
 .subscribe(subscriber);
 }
 }
Demo 
 1. FlowableRange
 2. BeerClientFlowable Code samples: https://github.com/yfain/rxjava2
An Operator Observable Observable A transforming
 function observableBeer
 .filter(b -> "USA".equals(b.country)) Docs: http://reactivex.io/documentation/operators
QUIZ: What value(s) this observable emits? Observable
 .just(5, 9, 10) // emits 5, 9, 10
 
 .filter(i -> i % 3 > 0)
 
 .map(n -> "$" + n * 10)
 
 .filter(s -> s.length() < 4);
Observable
 .just(5, 9, 10) // emits 5, 9, 10
 
 .filter(i -> i % 3 > 0)
 
 .map(n -> "$" + n * 10)
 
 .filter(s -> s.length() < 4) 
 .subscribe(System.out::println);
Composing observables
merge: combining observables
concat: combining observables in order
zip: combining observables of different types
flatMap
switchMap
Types of Observables Hot Cold Push Produce a steam even
 if no-one cares r Produce a stream when 
 someone asks for it ✔
Hot Observables • Mouse events • Publishing stock prices • An accelerometer in a smartphone
Making observables hot
Turning a cold observable into hot ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
 .interval(1, TimeUnit.SECONDS) // generate seq numbers every second
 .publish(); // make it hot
 
 numbers.connect(); // creates an internal subscriber to start producing data
 numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));
 
 Thread.sleep(3000);
 
 numbers.subscribe(n ->System.out.println(" Second subscriber: "+ n ));

Demo 
 HotObservable
Schedulers
Concurrency with Schedulers • subscribeOn(strategy) - run Observable in a separate thread: 
 Operations with side effects like files I/O;
 Don’t hold off the UI thread
 • observeOn(strategy) - run Observer in a separate thread,
 Update Swing UI on the Event Dispatch Thread
 Update Android UI on the Main thread
Multi-threading strategies • Schedulers.computation() - for computations: # of threads <= # of cores • Schedulers.io() - for long running communications; backed by a thread pool • Schedulers.newThread() - new thread for each unit of work • Schedulers.from(Executor) - a wrapper for Java Executor • Schedulers.trampoline() - queues the work on the current thread • AndroidSchedulers.mainThread() - handle data on the main thread (RxAndroid)
Switching threads Operator1() Operator2() ObserveOn() Observable Subscriber Thread 1 Thread 2 subscribeOn() observeOn()
Multi-threaded processing with flatMap() Operator1() Operator2() flatMap() Observable Subscriber Thread 1 Observable/Thr2 Observable/Thr3 Observable/ThrN Spawn a separate thread for each observable …
Turning a value into
 an observable Observable.range(1, 1000) .flatMap(n->Observable.just(n)
 .subscribeOn(Schedulers.computation()))
 .map(n->n*2)
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(myView.update()); Subscribing to each observable on the computation thread Switching to the main thread Updating the UI
Demo 
 schedulers/SubscribeOnObserveOn
 
 ParallelStreams
Parallel operations with ParallelFlowabe • Parallel execution of some operators
 • The source sequence is dispatched into N parallel “rails” • runOn() —-> sequential() is more efficient fork/join than flatMap()
 • Each rail can spawn multiple async threads with Schedulers
Parallel Execution int numberOfRails = 4; // can query #processors with parallelism()
 
 ParallelFlowable
 .from(Flowable.range(1, 10), numberOfRails)
 .runOn(Schedulers.computation())
 .map(i -> i * i)
 .filter(i -> i % 3 == 0)
 .sequential()
 .subscribe(System.out::println); Tasks run simultaneously on different CPUs or computers. ParallelFlowableRange.java
Summary • Observable: no backpressure support • Flowable: backpressure support • Operators can be chained • flatmap() used for handling observable of observables • Schedulers support multi-threading • subscribeOn()/observeOn() - switching between threads • ParallelFlowable - initiate parallel processing
Links • Slides: http://bit.ly/2q3Ovt4 • Code samples: 
 https://github.com/yfain/rxjava2 • Our company: faratasystems.com • Blog: yakovfain.com • Twitter: @yfain


Reactive Streams and RxJava2

  • 1.
    Reactive Streams and RxJava2 YakovFain, Farata Systems
 
 yfain
  • 2.
    About myself • SolutionsArchitect at Farata Systems • Java Champion • Latest book:
 “Angular 2 Development with TypeScript”

  • 3.
    “If I hadasked people what they wanted, they would have said faster horses”
 
 Henry Ford

  • 4.
    Synchronous vs asynchronous •Synchronous programming is straightforward • Asynchronous programming dramatically increases complexity
  • 5.
    Challenges in asyncprogramming • Handling failed async requests • Chaining async calls • The user already closed the view • Switching threads to update UI
  • 6.
    and more challenges •Avoiding writing blocking code in a multi-threaded app • Several consumers • Composable functions • Killing a pending Http request
  • 7.
    Backpressure Subscriber
 Sammy A publisher generatesmore data than a subscriber can process Publisher
  • 8.
    Examples of backpressure •The stock price changes 100 times a second • Accelerometer produces 50 signals a second • Iteration through a JDBC result set (rxjava-jdbc; rxjava2-jdbc) • A user drags the mouse to draw a curve. Can’t apply back pressure.
  • 9.
    Reactive Apps • Thedata is processed as streams • Message-driven: notifications • Resilient: stay alive in case of failures • Data flows through your app’s algorithms • Hide complexity of multi-threading
  • 10.
    Reactive Streams • Aspec for async stream processing with non-blocking backpressure
 
 http://www.reactive-streams.org
 • Reactive Streams interfaces for JVM 
 
 https://github.com/reactive-streams/reactive-streams-jvm • Reactive Streams-based products: 
 
 RxJava2, Java 9 Flow APIs, Reactor 3, Akka Stream, MongoDB, Vert.x …
  • 11.
  • 12.
  • 13.
    Reactive Streams: Javainterfaces backpressure
 support
  • 14.
    Reactive Streams: Javainterfaces backpressure
 support
  • 15.
    Main RxJava2 players •Observable or Flowable - producers of data • Observer or Subscriber - consumers of data • Subject or Processor - implements both producer and consumer • Operator - en-route data transformation • Scheduler - multi-threading support
  • 16.
    Main Publishers andSubscribers in RxJava2 Observable
 no backpressure support 
 public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Publisher Subscriber Not from 
 Reactive Streams
  • 17.
    Observable
 no backpressure support Flowable
 withbackpressure support public interface Observer<T> {
 void onSubscribe(Disposable var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } public interface Subscriber<T> {
 void onSubscribe(Subscription var1);
 
 void onNext(T var1);
 
 void onError(Throwable var1);
 
 void onComplete();
 } Not from 
 Reactive Streams From 
 Reactive Streams Main publishers and subscribers in RxJava2 Publisher Subscriber
  • 18.
    Creating an Observable •Observable.create() • Observable.fromArray() • Observable.fromIterable() • Observable.fromCallable() • Observable.fromFuture() • Observable.range() • Observable.just() Observable.create(subscriber -> { int totalBeers = beerStock.size(); for (int i = 0; i < totalBeers; i++) {
 // synchronous push subscriber.onNext(beerStock.get(i)); } subscriber.onComplete(); });
  • 19.
    Observable<Beer> observableBeer =Observable.create(/* data source */); observableBeer
 .skip(1)
 .take(3)
 .filter(beer -> "USA".equals(beer.country))
 .map(beer -> beer.name + ": $" + beer.price)
 .subscribe(
 beer -> System.out.println(beer),
 
 err -> System.out.println(err),
 
 () -> System.out.println("Streaming is complete”),
 
 disposable -> System.out.println( 
 "Someone just subscribed to the beer stream!!!”)
 );
 ); Observable push O b s e r v e r Subscription
  • 20.
    Creating an Observerand subscribing Observable<Beer> beerData = BeerServer.getData(); // returns an Observable
 
 Observer beerObserver = new Observer<Beer>() {
 
 public void onSubscribe(Disposable d) {
 System.out.println( " !!! Someone just subscribed to the bear stream!!! ");
 
 // If the subscriber is less than 21 year old, cancel subscription
 // d.dispose();
 }
 
 public void onNext(Beer beer) {
 System.out.println(beer);
 }
 
 public void onError(Throwable throwable) {
 System.err.println("In Observer.onError(): " + throwable.getMessage());
 }
 
 public void onComplete() {
 System.out.println("*** The stream is over ***");
 }
 };
 
 beerData
 .subscribe(beerObserver); // Streaming starts here
  • 21.
    Subscription subscription =Observable.create(new Observable.OnSubscribe<Response>() {
 OkHttpClient client = new OkHttpClient();
 
 @Override public void call(Subscriber<? super Response> subscriber) { // invoked on subscription try { Response response = client.newCall( // prepare the call for future execution new Request.Builder().url(“http://localhost:8080/beers“) .build()) .execute(); // use enqueue() for async requests 
 subscriber.onNext(response);
 
 subscriber.onComplete(); 
 if (!response.isSuccessful()) { subscriber.onError(new Exception(“Can’t get beers”)); } } catch (IOException e) { subscriber.onError(e); } } })
 .subscribe(...); // pass Observer; use observeOn/SubscribeOn to switch threads Pushing HTTP responses https://square.github.io/okhttp
  • 22.
    Controlling the flowwith request() request(1); request(1);
  • 23.
    Handling backpressure Publisher Subscriber request(1) request(3) … request()is non-blocking onNext(value1) onNext(value2) onNext(value3) onNext(value4)
  • 24.
  • 25.
  • 26.
  • 27.
    Creating a Flowable •Flowable.create() • Flowable.fromArray() • Flowable.fromIterable() • Flowable.fromCallable() • Flowable.empty() • Flowable.range() • Flowable.just()
  • 28.
  • 29.
    Requesting data fromFlowable public class FlowableRange {
 
 static DisposableSubscriber<Integer> subscriber;
 
 public static void main(String[] args) {
 
 subscriber = new DisposableSubscriber<Integer>() {
 
 public void onStart() {
 request(5);
 
 while (true){ // Emulate 1-sec processing
 try {
 Thread.sleep(1000);
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 request(1);
 }
 }
 
 public void onNext(Integer t) {
 System.out.println("processing "+ t);
 if (t==8) { // just to demo unsubscribing
 subscriber.dispose();
 }
 }
 
 public void onError(Throwable thr) {
 System.err.println("In onError(): " + thr.getMessage()); 
 }
 
 public void onComplete() {
 System.out.println("Done");
 }
 };
 
 Flowable.range(1, 10)
 .subscribe(subscriber);
 }
 }
  • 30.
    Demo 
 1. FlowableRange
 2.BeerClientFlowable Code samples: https://github.com/yfain/rxjava2
  • 31.
    An Operator Observable Observable Atransforming
 function observableBeer
 .filter(b -> "USA".equals(b.country)) Docs: http://reactivex.io/documentation/operators
  • 32.
    QUIZ: What value(s)this observable emits? Observable
 .just(5, 9, 10) // emits 5, 9, 10
 
 .filter(i -> i % 3 > 0)
 
 .map(n -> "$" + n * 10)
 
 .filter(s -> s.length() < 4);
  • 33.
    Observable
 .just(5, 9, 10)// emits 5, 9, 10
 
 .filter(i -> i % 3 > 0)
 
 .map(n -> "$" + n * 10)
 
 .filter(s -> s.length() < 4) 
 .subscribe(System.out::println);
  • 34.
  • 35.
  • 36.
  • 37.
    zip: combining observablesof different types
  • 38.
  • 39.
  • 40.
    Types of Observables HotCold Push Produce a steam even
 if no-one cares r Produce a stream when 
 someone asks for it ✔
  • 41.
    Hot Observables • Mouseevents • Publishing stock prices • An accelerometer in a smartphone
  • 42.
  • 43.
    Turning a coldobservable into hot ConnectableObservable<Long> numbers = (ConnectableObservable<Long>) Observable
 .interval(1, TimeUnit.SECONDS) // generate seq numbers every second
 .publish(); // make it hot
 
 numbers.connect(); // creates an internal subscriber to start producing data
 numbers.subscribe(n ->System.out.println("First subscriber: "+ n ));
 
 Thread.sleep(3000);
 
 numbers.subscribe(n ->System.out.println(" Second subscriber: "+ n ));

  • 44.
  • 45.
  • 46.
    Concurrency with Schedulers •subscribeOn(strategy) - run Observable in a separate thread: 
 Operations with side effects like files I/O;
 Don’t hold off the UI thread
 • observeOn(strategy) - run Observer in a separate thread,
 Update Swing UI on the Event Dispatch Thread
 Update Android UI on the Main thread
  • 47.
    Multi-threading strategies • Schedulers.computation()- for computations: # of threads <= # of cores • Schedulers.io() - for long running communications; backed by a thread pool • Schedulers.newThread() - new thread for each unit of work • Schedulers.from(Executor) - a wrapper for Java Executor • Schedulers.trampoline() - queues the work on the current thread • AndroidSchedulers.mainThread() - handle data on the main thread (RxAndroid)
  • 48.
    Switching threads Operator1() Operator2()ObserveOn() Observable Subscriber Thread 1 Thread 2 subscribeOn() observeOn()
  • 49.
    Multi-threaded processing withflatMap() Operator1() Operator2() flatMap() Observable Subscriber Thread 1 Observable/Thr2 Observable/Thr3 Observable/ThrN Spawn a separate thread for each observable …
  • 50.
    Turning a valueinto
 an observable Observable.range(1, 1000) .flatMap(n->Observable.just(n)
 .subscribeOn(Schedulers.computation()))
 .map(n->n*2)
 .observeOn(AndroidSchedulers.mainThread())
 .subscribe(myView.update()); Subscribing to each observable on the computation thread Switching to the main thread Updating the UI
  • 51.
  • 52.
    Parallel operations withParallelFlowabe • Parallel execution of some operators
 • The source sequence is dispatched into N parallel “rails” • runOn() —-> sequential() is more efficient fork/join than flatMap()
 • Each rail can spawn multiple async threads with Schedulers
  • 53.
    Parallel Execution int numberOfRails= 4; // can query #processors with parallelism()
 
 ParallelFlowable
 .from(Flowable.range(1, 10), numberOfRails)
 .runOn(Schedulers.computation())
 .map(i -> i * i)
 .filter(i -> i % 3 == 0)
 .sequential()
 .subscribe(System.out::println); Tasks run simultaneously on different CPUs or computers. ParallelFlowableRange.java
  • 54.
    Summary • Observable: nobackpressure support • Flowable: backpressure support • Operators can be chained • flatmap() used for handling observable of observables • Schedulers support multi-threading • subscribeOn()/observeOn() - switching between threads • ParallelFlowable - initiate parallel processing
  • 55.
    Links • Slides: http://bit.ly/2q3Ovt4 •Code samples: 
 https://github.com/yfain/rxjava2 • Our company: faratasystems.com • Blog: yakovfain.com • Twitter: @yfain