Isomorphic Reactive Programming Giorgio Natili
@giorgionatili#MobileTea + @DroidconBos About Me • Engineering Manager at Akamai (NG2 on desktop and mobile) • Founder of Mobile Tea • Organizer of the Hash Conf • Organizer of DroidCon Boston
Mobile Tea
Hash Conf http://hashconf.io/
Droidcon Boston bitly.com/dcbos18
@giorgionatili#MobileTea + @DroidconBos Agenda • Fundamentals of Reactive Programming • Fundamentals of Reactive Extensions • Observables, Observers and Subjects • Reactive Extensions Operators • Parallel Programming
Reactive Programming
@giorgionatili#MobileTea + @DroidconBos Foundation • Programming paradigm • Computation is affected by signals • No notion of event scope • It favors function composition
@giorgionatili#MobileTea + @DroidconBos Elasticity (in contrast to Scalability) • A system scales up or down automatically to meet demand • Elasticity builds upon scalability adding the notion of automatic resource management
@giorgionatili#MobileTea + @DroidconBos Failure (in contrast to Error) • A failure is an unexpected event that prevents a system from continuing to function normally • This is in contrast with an error, which is an expected and coded-for condition
@giorgionatili#MobileTea + @DroidconBos Isolation (and Containment) • Isolation can be defined in terms of decoupling in time and space • Decoupling in time means that the sender and receiver can have independent life-cycles • Decoupling in space means that the sender and receiver do not have to run in the same process
@giorgionatili#MobileTea + @DroidconBos Message-Driven (in contrast to Event-Driven) • A message is an item of data that is sent to a specific destination • Notification listeners are attached to event sources such that get invoked when the event is emitted
@giorgionatili#MobileTea + @DroidconBos Back-Pressure • When one component is struggling to keep-up, the system as a whole needs to respond in a sensible way • Back-pressure is an important feedback mechanism that allows systems to gracefully respond to load rather than collapse under it
@giorgionatili#MobileTea + @DroidconBos Signals • Functional programming ties signals together in a dependency graph • Each signal is either an external input or a derived signal • Signals feeds off of other signals that have already been defined
@giorgionatili#MobileTea + @DroidconBos Advantages • Efficiency, by minimizing the amount of the computation that must be rerun when inputs change (e.g. history) • Dynamism by reconfiguring the computation over time, as the inputs to your system change • Ease of reasoning due to a paradigm that has a clean semantics
@giorgionatili#MobileTea + @DroidconBos Streams • Data • I/O operations • User interactions
@giorgionatili#MobileTea + @DroidconBos Streams and Async Code • Asynchronous code allows independent IO operations to run concurrently • Streams can be encapsulated in async code and improve code efficiency and readability
@giorgionatili#MobileTea + @DroidconBos Callback Hell
@giorgionatili#MobileTea + @DroidconBos Get Out of the Callback Hell • Abstractions that allow us to express the effect of latency in asynchronous computations • Encapsulate event-handling code, and use higher- order functions such as map, reduce, and filter • Compose clean and readable asynchronous code
@giorgionatili#MobileTea + @DroidconBos Warning • Reactive code composed with higher-order functions is often not purely functional • Functional reactive code typically involves side effects through network or file system IO
@giorgionatili#MobileTea + @DroidconBos Functions
@giorgionatili#MobileTea + @DroidconBos Composition and Currying • A mechanism to combine simple functions to build more complicated ones • Closely related to currying that instead translate a function with multiple arguments into multiple functions accepting one argument
@giorgionatili#MobileTea + @DroidconBos Pure Functions • In a pure functional language a function can only read what is supplied to it in its arguments • The only way it can have an effect on the world is through the values it returns
@giorgionatili#MobileTea + @DroidconBos Monads “Monads are monoids in the category of endofunctors”
@giorgionatili#MobileTea + @DroidconBos Monoids • Are the most basic way of combining values together • Don’t necessarily have to combine containers of values • Can combine any values with certain properties • Support associativity and neutral elements (aka 0)
@giorgionatili#MobileTea + @DroidconBos Monads • Elegant abstractions for describing how state is manipulated in a pure functional programming language • Monads apply a function that returns a wrapped value to a wrapped value
@giorgionatili#MobileTea + @DroidconBos Monads Operations • The return operation that promotes regular values of type A to monads M[A] • The bind operation that takes a function A => M[B] that is used to transform underlying values
@giorgionatili#MobileTea + @DroidconBos Practically Speaking • Monads are wrappers around function invocations • There are a bunch of different Monads with identical syntax but different semantics • Wrappers that do IO, Exceptions, Nullability, State, Logging, Messaging, etc.
Optional<String> maybeFirstName = Optional.of("Joe"); Optional<String> maybeLastName = Optional.of("Black"); Optional<String> maybeFullName = maybeFirstName.flatMap(firstName -> maybeLastName.map(lastName -> firstName + " " + lastName) ); Monads in Java
@giorgionatili#MobileTea + @DroidconBos Advantages • Interfacing pure functional programming to the impure dysfunctional world • Interacting with a stateful system without compromising functional programming semantic
@giorgionatili#MobileTea + @DroidconBos Stateful and Stateless • Stateful means the system keeps track of the state of interaction by setting values in a sort of storage • Stateless means there is no record of previous interactions and each interaction request has to be handled based entirely on information that comes with it
@giorgionatili#MobileTea + @DroidconBos Functors • When a value is wrapped in a context, you can't apply a normal function to it • A Functor is any data type that defines how functions like map applies to it • A functor is simply something that can be mapped over
@giorgionatili#MobileTea + @DroidconBos Functors and map fun sumThree(n: Int) = n + 3 Option.Some(2).map(::sumThree) // => Some(5) func plusThree(addend: Int) -> Int { return addend + 3 } Optional.Some(2).map(plusThree) // => .Some(5)
@giorgionatili#MobileTea + @DroidconBos Recap • A functor is a type, denoted by Type<T>, that: • wraps another inner type (like Array<T> or Optional<T> ) • has a method map with the signature (T->U) -> Type<U> • A monad is a type that: • is a functor (so it has an inner type T and a map method) • also has a method flatMap with the signature 
 (T->Type<U>) -> Type<U>
Reactive Programming and Reactive Extensions
@giorgionatili#MobileTea + @DroidconBos Reactive Extensions • It is a set of tools allowing imperative programming languages to operate on sequences of data • It provides a set of sequence operators that operate on each item in the sequence
@giorgionatili#MobileTea + @DroidconBos Functional Reactive Programming (FRP) • It’s just another a programming paradigm • It focuses on reacting to streams of changes (data and events) • It favors function composition, avoidance of global state and side effects
@giorgionatili#MobileTea + @DroidconBos Observer Pattern and Manipulation • FRP is based on the Observer pattern • FRP supports manipulating and transforming the stream of data that Observables emit
@giorgionatili#MobileTea + @DroidconBos Lambdas • A lambda expression is an anonymous function that you can use to create delegates or expression tree types • Lambda functions are often arguments being passed to higher-order functions
@giorgionatili#MobileTea + @DroidconBos Subject • A Subject is a sort of bridge that is available in most of the implementations of ReactiveX • It acts both as an observer and as an Observable • It can subscribe to one or more Observables, • Because it is also an Observable, it can pass through the items it observes by re-emitting them
@giorgionatili#MobileTea + @DroidconBos Observables • onNext, an Observable calls this method whenever the Observable emits an item • onError, an Observable calls this method to indicate that it has failed to generate the expected data • onCompleted, an Observable calls this method after it has called onNext for the final time
@giorgionatili#MobileTea + @DroidconBos Push vs Pull • Observable is push based, it decides when to emit values • Iterable is pull based, it sits until the some ask for the next() value
@giorgionatili#MobileTea + @DroidconBos Observable Pipelines Observable<String> sentenceObservable = 
 Observable.from("hello", "world"); Observable.subscribe(new Action1<String>() {
 @Override public void call(String s) { System.out.println(s); } });
@giorgionatili#MobileTea + @DroidconBos Observing Subjects • A professor is an observable • A student is an observer • A Subject emits all the subsequent items of the source Observable at the time of subscription
@giorgionatili#MobileTea + @DroidconBos Publish Subject 
 “It emits all the subsequent items of the source Observable at the time of subscription”
@giorgionatili#MobileTea + @DroidconBos 
 “It emits all the items of the source Observable, regardless of when the subscriber subscribes” Replay Subject
@giorgionatili#MobileTea + @DroidconBos Behavior Subject 
 “It emits the most recently emitted item and all the subsequent items of the source Observable when an observer subscribes to it”
@giorgionatili#MobileTea + @DroidconBos Async Subject 
 “It only emits the last value of the source Observable (and only the last value)”
Creating Observables
@giorgionatili#MobileTea + @DroidconBos Create 
 “By using the Create operator, you pass this operator a function that accepts the observer as its parameter”
@giorgionatili#MobileTea + @DroidconBos Defer 
 “Do not create the Observable until the observer subscribes, and create a fresh Observable for each observer”
@giorgionatili#MobileTea + @DroidconBos From 
 “Convert various other objects and data types into Observables”
Observable.create(new Observable.OnSubscribe<Integer>() { @Override public void call(Subscriber<? super Integer> observer) { try { if (!observer.isUnsubscribed()) { for (int i = 1; i < 5; i++) { observer.onNext(i); } observer.onCompleted(); } } catch (Exception e) { observer.onError(e); } } } ).subscribe(new Subscriber<Integer>() { @Override public void onNext(Integer item) { System.out.println("Next: " + item); } @Override public void onError(Throwable error) { System.err.println("Error: " + error.getMessage()); } @Override public void onCompleted() { System.out.println("Sequence complete."); } });
@giorgionatili#MobileTea + @DroidconBos Just 
 “Converts an object or a set of objects into an Observable that emits that or those objects”
@giorgionatili#MobileTea + @DroidconBos Interval 
 “Creates an Observable that emits a sequence of integers spaced by a given time interval”
let source = Rx.Observable .interval(500 /* ms */) .timeInterval() .take(3); let subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
@giorgionatili#MobileTea + @DroidconBos Timer 
 “Creates an Observable that emits a particular item after a given delay”
Transforming Operators
@giorgionatili#MobileTea + @DroidconBos Map 
 Transforms the items emitted by an Observable by applying a function to each item
@giorgionatili#MobileTea + @DroidconBos FlatMap 
 Transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
@giorgionatili#MobileTea + @DroidconBos Scan 
 Applies a function to each item emitted by an Observable, sequentially, and emit each successive value
@giorgionatili#MobileTea + @DroidconBos Buffer 
 Periodically gather items emitted by an Observable into bundles and emit these bundles rather than emitting the items one at a time
@giorgionatili#MobileTea + @DroidconBos Window 
 Periodically subdivide items from an Observable into Observable windows and emit these windows rather than emitting the items one at a time
ViewObservable.clicks(view) .map { System.currentTimeMillis() } .slidingWindow(3) .filter { it.get(2) - it.get(0) < 500 } .subscribe { println("Triple tap!") } )
Filtering Operators
@giorgionatili#MobileTea + @DroidconBos Filter 
 “Emits only those items from an Observable that pass a predicate test”
@giorgionatili#MobileTea + @DroidconBos First 
 “Emits only the first item (or the first item that meets some condition) emitted by an Observable”
@giorgionatili#MobileTea + @DroidconBos Last 
 “Emits only the last item (or the last item that meets some condition) emitted by an Observable”
@giorgionatili#MobileTea + @DroidconBos Find 
 “Emits only the items emitted by an Observable that meet a condition expressed as a function”
@giorgionatili#MobileTea + @DroidconBos Skip 
 “Suppress the first n items emitted by an Observable”
let source = Rx.Observable.timer(0, 1000) .skipUntilWithTime(5000); let subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
@giorgionatili#MobileTea + @DroidconBos Take 
 “Emits only the first n items emitted by an Observable”
@giorgionatili#MobileTea + @DroidconBos Distinct 
 “Suppresses duplicate items emitted by an Observable”
Combining Operators
@giorgionatili#MobileTea + @DroidconBos Merge 
 “Combines multiple Observables into one by merging their emissions”
@giorgionatili#MobileTea + @DroidconBos Zip “Combines the emissions of multiple Observables together using function and emit single items for each combination based on the results of this function”
/* Using arguments */ var range = Rx.Observable.range(0, 5); var source = Observable.zip( range, range.skip(1), range.skip(2), function (s1, s2, s3) { return s1 + ':' + s2 + ':' + s3; } ); var subscription = source.subscribe( function (x) { console.log('Next: ' + x); }, function (err) { console.log('Error: ' + err); }, function () { console.log('Completed'); });
@giorgionatili#MobileTea + @DroidconBos Join 
 “Combines the items emitted by two Observables, and selects the items to combine upon the duration-windows defined on a per-item basis”
@giorgionatili#MobileTea + @DroidconBos Concat 
 “Concatenates the output of multiple Observables so that they act like a single Observable”
Parallelization
@giorgionatili#MobileTea + @DroidconBos Achieving Parallelization • A common question about Reactive Extensions is how to achieve parallelization, or emitting multiple items concurrently from an Observable • You can achieve parallelization in RxJava without breaking the Observable contract
@giorgionatili#MobileTea + @DroidconBos Schedulers • immediate(), creates and returns a Scheduler that executes work immediately on the current thread • trampoline(), creates and returns a Scheduler that queues work on the current thread to be executed after the current work completes • newThread(), creates and returns a Scheduler that creates a new Thread for each unit of work • computation(), creates and returns a Scheduler intended for computational work • io(),creates and returns a Scheduler intended for IO-bound work (unbounded, multiple threads)
@giorgionatili#MobileTea + @DroidconBos subscribeOn The method subscribeOn() allows you to move the execution of the Observable code into another thread and with an specific behavior
@giorgionatili#MobileTea + @DroidconBos flatMap The flatMap() has to merge emissions from multiple Observables happening on multiple threads, but it cannot allow concurrent onNext() calls to happen down the chain including the Subscribe
Observable<Integer>vals = Observable.range(1,10); vals.flatMap(val -> Observable.just(val) .subscribeOn(Schedulers.computation()) .map(i -> intenseCalculation(i)) ).toList() .subscribe(val -> System.out.println("Subscriber received " + val + " on " + Thread.currentThread().getName())); Calculating 4 on RxComputationThreadPool-2 Calculating 1 on RxComputationThreadPool-3 Calculating 2 on RxComputationThreadPool-4 Calculating 3 on RxComputationThreadPool-1 Calculating 7 on RxComputationThreadPool-1 Calculating 6 on RxComputationThreadPool-4 Calculating 5 on RxComputationThreadPool-3 Calculating 10 on RxComputationThreadPool-4 Calculating 8 on RxComputationThreadPool-2 Calculating 9 on RxComputationThreadPool-3 Subscriber received [3, 2, 1, 6, 4, 7, 10, 5, 8, 9] on RxComputationThreadPool-3 Parallelization with RxJava
What’s About Swift?
What’s About JavaScript?
Questions?
–Giorgio Natili “Keep calm and react gracefully to asynchronous system events…”

Isomorphic Reactive Programming