RxJS Reactive extensions for JavaScript
$ whoami Viliam Elischer SW Engineer at ZOOM International twitter.com/vireliam github.com/vire
Introduction Theory Building blocks Usage Alternatives Resources Agenda
History Originally project Volta (December 2007) Erik Meijer: Volta is an experiment that enables Microsoft to explore new ways of developing distributed applications. Today ReactiveX supports RxJava, RxJS, Rx.NET, UniRx, RxScala, RxClojure, RxCpp, Rx.rb, RxPY, RxGroovy, RxJRuby, RxKotlin, RxSwift, RxNetty, RxAndroid RxCocoa Mental fathers: Erik Meijer, Matthew Podwysowski RxJS Contributors: Ben Lesh, Paul Daniels + 164 more individuals
RxJS is set of libraries for composing asynchronous and event-based programs.
Developers represent asynchronous data streams with Observables, query asynchronous data streams using many operators, and parameterize the concurrency in the asynchronous data streams using Schedulers.
Developers represent asynchronous data streams with Observables, query asynchronous data streams using many operators, and parameterize the concurrency in the asynchronous data streams using Schedulers.
Developers represent asynchronous data streams with Observables, query asynchronous data streams using many operators, and parameterize the concurrency in the asynchronous data streams using Schedulers.
Asynchronous data streams demystified
A Stream is just a fancy word for a sequence of “things” streamed over time.
A sequence is infinite until it completes. It is not possible to add anything after completion.
The act of change on a sequence is simply an event.
A Stream is an infinite sequence of events over time.
A Sequence can be observed and acts as a Subject of observation.
An Observable sequence.
The ability to observe was defined because there is a good reason or interest.
An Observer is interested in what happens with the Subject (Observable).
To get notified about changes an Observer must subscribe to an Observable sequence.
A Subscription represents the contract between Observer and Observable.
The Subject notifies the Observer about changes by pushing events.
The Observer represents behavior for handling events pushed by the observed sequence.
Such prepared reaction to a push happens asynchronously.
Leveraging Observables from managing the “when” aspect is possible thanks to Schedulers.
Most important features of Observables are fully empowered when sophisticated operations are being performed in form of queries.
Computation scenarios time / val One value Multiple values Synchronous | Pull Object Iterables Array | Map | Set | Object Asynchronous | Push Promise Observables
Design patterns
Faig Ahmed - Liquid, Handmade Woolen carpet 200x290, year 2014
GOF Design Patterns Iterator IEnumerable<T> and IEnumerator<T> interactive computations where the consumer synchronously pulls data from the producer Observer IObservable<T> and IObserver<T> reactive computations where the producer asynchronously pushes data to the consumer http://csl.stanford.edu/~christos/pldi2010.fit/meijer.duality.pdf http://josemigueltorres.net/index.php/ienumerableiobservable-duality/
Asynchronously? Sure, we have promises...
Problematic aspects of promises Error swallow Bulk processing (map 20 XHRs to 1 result) Cancellation (context switching) Replay of previous values
Building blocks function Disposable() { } Disposable.prototype.dispose = () => { ... } function Observable() { } Observable.prototype.subscribe = observer => { ... } function Observer() { } Observer.prototype.onNext = value => { ... }; Observer.prototype.onError = error => { ... }; Observer.prototype.onCompleted = () => { ... };
RxJS = Observables + Operators + Schedulers.
It’s all about operators...
Operator, I need an operator.
We do have operators
131
creation, conversion, combine, functional, mathematical, time, exceptions, miscellaneous, selection and primitives
Creation - Observable.just | .return Rx.Observable.just(42) .subscribe(val => {console.log('val: ' + val)}) // val: 42
Creation - Observable.create const source = Rx.Observable.create(observer => { observer.onNext(42); observer.onError(new Error('ha-ha!')); observer.onCompleted(); }); source.subscribe( val => console.log('onNext:', val), error => console.log('onError:', error), () => console.log('onCompleted:') );
Creation - Observable.create + Disposable const source = Rx.Observable.create(observer => { observer.onNext(42); observer.onError(new Error('ha-ha!')); observer.onCompleted(); return Rx.Disposable.create(() => console.log('disposed')); }); source.subscribe( val => console.log('onNext:', val), error => console.log('onError:', error), () => console.log('onCompleted:') );
Creation - Observable.create + Disposable const asyncOp = () => { return new Promise((resolve, reject) => { setTimeout(() => { console.log('resolving!') resolve(42); }, 1000); console.log('promise in progress...') }); } asyncOp().then(val => console.log('resolved with: ', val))
Promises can’t be disposed!
const asyncOp = () => { return new Promise((resolve, reject) => { setTimeout(() => { console.log('resolving!') resolve(42); }, 1000); console.log('promise in progress...') }); } asyncOp().then(val => console.log('resolved', val)) const source = Rx.Observable.create(observer => { console.log('Observable in progress') const timeout = setTimeout(() => { observer.onNext('onNext after two seconds!'); }, 2000); return Rx.Disposable.create(() => { console.log('disposed'); clearTimeout(timeout); }); }); const subscription = source.subscribe( val => console.log('onNext:', val), ); setTimeout(() => subscription.dispose(), 500);
Creation - Observable.from Arguments, Array, Iterable, Set, Map, String, Generated sequence Rx.Observable.from([1, 2, 3], x => x + x).subscribe( x => console.log(`onNext: ${x}`), e => console.log(`onError: ${e}`), () => console.log('onCompleted') ); // ! 2nd argument is a mapFn // => onNext: 2 // => onNext: 4 // => onNext: 6 // => onCompleted
Creation - Observable.from${PutHereWhatYouHave} .fromCallback, .fromPromise, fromEvent, fromNodeCallback, const firstNameInput = document.querySelector('#firstName'); Rx.Observable.fromEvent(firstNameInput, 'input') .pluck('target', 'value') .subscribe(e => console.log(e));
Hot ‘n cold
const source = Rx.Observable.interval(1000) .map(val => val + 1) .take(10); source .subscribe(val => console.log('first: ' + val)); setTimeout(() => { source .subscribe(val => console.log('second: ' + val)); }, 5000); // "first: 1" // "first: 2" // "first: 3" // "second: 1" // "first: 4" // "second: 2" // ... const source = Rx.Observable.interval(1000) .map(val => val + 1) .take(10) .publish() .refCount(); source .subscribe(val => console.log('first: ' + val)); setTimeout(() => { source.subscribe(val => console.log('second: ' + val)); }, 5000); // "first: 1" // "first: 2" // "first: 3" // "second: 3" // "first: 4" // ...
Hot - Live stream .publish() + .refcount() Cold - Recorded video by default
Query!
.debounce | .throttle // https://remysharp.com/2010/07/21/throttling-function-calls function throttle(fn, threshhold, scope) { threshhold || (threshhold = 250); var last, deferTimer; return function () { var context = scope || this, now = +new Date, args = arguments; if (last && now < last + threshhold) { clearTimeout(deferTimer); deferTimer = setTimeout(function () { last = now; fn.apply(context, args); }, threshhold); } else { last = now; fn.apply(context, args); } }; } window.addEventListener('resize', throttle(function (event) {console.log('resized');}), false); Rx.Observable.fromEvent(window, 'resize') .throttle(250) .subscribe(val => console.log('resized', window. innerWidth))
.map and .flatMap Rx.Observable.fromArray(['good', 'evening', 'ng- party!']) .map(item => item.toUpperCase()) .subscribe(val => console.log(val)); // "GOOD" // "EVENING" // "NG-PARTY!" Rx.Observable.fromArray(['good', 'evening', 'ng- party!']) .map(item => Promise.resolve(item.toUpperCase())) .subscribe(val => console.log(val)); // [object Object] { ... }, [object Object] { ... }, [object Object] { ... } Rx.Observable.fromArray(['good', 'evening', 'ng-party!']) .flatMap(item => Promise.resolve(item.toUpperCase())) .subscribe(val => console.log(val)); // "GOOD" // "EVENING" // "NG-PARTY!"
.takeUntil const intervalSource = Rx.Observable.interval(500) const source = Rx.Observable .fromEvent(document.querySelector('#toggle-stream'), 'click'); source .pluck('target', 'checked') .filter(val => val) .flatMapLatest(() => intervalSource.takeUntil(source)) .subscribe(val => console.log('isChecked?', val),)
.reduce or .scan Rx.Observable.fromArray(['It', 'is', 'time', 'for', 'beer','ng-party!']) .reduce((p, c) => p + ' ' + c, '') .subscribe(val => console.log(val.trim())); // "It is time for beer ng-party!" Rx.Observable.interval(500) .timeInterval() .pluck('interval') .reduce((p, c) => p + c, 10000) .subscribe(val => console.log(val)); ? ? ? ? ? ? ? ?
.reduce or .scan Rx.Observable.fromArray(['It', 'is', 'time', 'for', 'beer','ng-party!']) .reduce((p, c) => p + ' ' + c, '') .subscribe(val => console.log(val.trim())); // "It is time for beer ng-party!" Rx.Observable.interval(500) .timeInterval() .pluck('interval') .scan((p, c) => p + c, 10000) .subscribe(val => console.log(val)); // 10502 // 11001 // 11501 // 12001
Versions
github.com/Reactive-Extensions/RxJS: v4.0.7 github.com/ReactiveX/RxJS: 5.0.0-alpha.14
RxJS Next npm install rxjs-es import Rx from 'rxjs/Rx'; Rx.Observable.of(1,2,3) import {Observable} from 'rxjs/Observable'; import 'rxjs/add/operator/map'; Observable.of(1,2,3).map(x => x + '!!!'); // etc import {Observable} from 'rxjs/Observable'; import {map} from 'rxjs/operator/map'; Observable.of(1,2,3)::map(x => x + '!!!'); // etc /* ES7 */ import { map, takeWhile, forEach } from "iterlib"; getPlayers() ::map(x => x.character()) ::takeWhile(x => x.strength > 100) ::forEach(x => console.log(x)); /* ES6 */ import { map, takeWhile, forEach } from "iterlib"; let _val; _val = getPlayers(); _val = map.call(_val, x => x.character()); _val = takeWhile.call(_val, x => x.strength > 100); _val = forEach.call(_val, x => console.log(x));
Advanced topics Re-play Backpressure Window ForkJoin Schedulers Generators Rx Testing, Virtual Time, Marbles WebSocketsSubject
Use cases Sane workflows with DOM Events (mousemove, keypress, drag and drop) UI Manipulation API Communication State management (Components, AppState, Dispatching) Set operations (map, filter, scan) Transducers
Alternative and affiliates
Bacon, Kefir
Angular 2
github.com/acdlite/redux-rx
garbles/yolk
Resources
xgrommx.github.io/rx-book rxmarbles.com egghead.io gitter.im/Reactive-Extensions/RxJS
Q & A
Thank you @vireliam #devsUnited

RxJS - The Reactive extensions for JavaScript

  • 3.
  • 4.
    $ whoami Viliam Elischer SWEngineer at ZOOM International twitter.com/vireliam github.com/vire
  • 5.
  • 7.
    History Originally project Volta(December 2007) Erik Meijer: Volta is an experiment that enables Microsoft to explore new ways of developing distributed applications. Today ReactiveX supports RxJava, RxJS, Rx.NET, UniRx, RxScala, RxClojure, RxCpp, Rx.rb, RxPY, RxGroovy, RxJRuby, RxKotlin, RxSwift, RxNetty, RxAndroid RxCocoa Mental fathers: Erik Meijer, Matthew Podwysowski RxJS Contributors: Ben Lesh, Paul Daniels + 164 more individuals
  • 8.
    RxJS is setof libraries for composing asynchronous and event-based programs.
  • 9.
    Developers represent asynchronousdata streams with Observables, query asynchronous data streams using many operators, and parameterize the concurrency in the asynchronous data streams using Schedulers.
  • 10.
    Developers represent asynchronousdata streams with Observables, query asynchronous data streams using many operators, and parameterize the concurrency in the asynchronous data streams using Schedulers.
  • 11.
    Developers represent asynchronousdata streams with Observables, query asynchronous data streams using many operators, and parameterize the concurrency in the asynchronous data streams using Schedulers.
  • 13.
  • 14.
    A Stream isjust a fancy word for a sequence of “things” streamed over time.
  • 15.
    A sequence isinfinite until it completes. It is not possible to add anything after completion.
  • 16.
    The act ofchange on a sequence is simply an event.
  • 17.
    A Stream isan infinite sequence of events over time.
  • 18.
    A Sequence canbe observed and acts as a Subject of observation.
  • 19.
  • 21.
    The ability toobserve was defined because there is a good reason or interest.
  • 22.
    An Observer isinterested in what happens with the Subject (Observable).
  • 23.
    To get notifiedabout changes an Observer must subscribe to an Observable sequence.
  • 24.
    A Subscription representsthe contract between Observer and Observable.
  • 25.
    The Subject notifiesthe Observer about changes by pushing events.
  • 26.
    The Observer representsbehavior for handling events pushed by the observed sequence.
  • 27.
    Such prepared reactionto a push happens asynchronously.
  • 28.
    Leveraging Observables frommanaging the “when” aspect is possible thanks to Schedulers.
  • 29.
    Most important featuresof Observables are fully empowered when sophisticated operations are being performed in form of queries.
  • 30.
    Computation scenarios time /val One value Multiple values Synchronous | Pull Object Iterables Array | Map | Set | Object Asynchronous | Push Promise Observables
  • 31.
  • 32.
    Faig Ahmed -Liquid, Handmade Woolen carpet 200x290, year 2014
  • 33.
    GOF Design Patterns Iterator IEnumerable<T>and IEnumerator<T> interactive computations where the consumer synchronously pulls data from the producer Observer IObservable<T> and IObserver<T> reactive computations where the producer asynchronously pushes data to the consumer http://csl.stanford.edu/~christos/pldi2010.fit/meijer.duality.pdf http://josemigueltorres.net/index.php/ienumerableiobservable-duality/
  • 34.
    Asynchronously? Sure, wehave promises...
  • 35.
    Problematic aspects ofpromises Error swallow Bulk processing (map 20 XHRs to 1 result) Cancellation (context switching) Replay of previous values
  • 36.
    Building blocks function Disposable(){ } Disposable.prototype.dispose = () => { ... } function Observable() { } Observable.prototype.subscribe = observer => { ... } function Observer() { } Observer.prototype.onNext = value => { ... }; Observer.prototype.onError = error => { ... }; Observer.prototype.onCompleted = () => { ... };
  • 37.
    RxJS = Observables+ Operators + Schedulers.
  • 38.
    It’s all aboutoperators...
  • 40.
    Operator, I needan operator.
  • 41.
    We do haveoperators
  • 42.
  • 43.
    creation, conversion, combine,functional, mathematical, time, exceptions, miscellaneous, selection and primitives
  • 44.
    Creation - Observable.just| .return Rx.Observable.just(42) .subscribe(val => {console.log('val: ' + val)}) // val: 42
  • 45.
    Creation - Observable.create constsource = Rx.Observable.create(observer => { observer.onNext(42); observer.onError(new Error('ha-ha!')); observer.onCompleted(); }); source.subscribe( val => console.log('onNext:', val), error => console.log('onError:', error), () => console.log('onCompleted:') );
  • 46.
    Creation - Observable.create+ Disposable const source = Rx.Observable.create(observer => { observer.onNext(42); observer.onError(new Error('ha-ha!')); observer.onCompleted(); return Rx.Disposable.create(() => console.log('disposed')); }); source.subscribe( val => console.log('onNext:', val), error => console.log('onError:', error), () => console.log('onCompleted:') );
  • 47.
    Creation - Observable.create+ Disposable const asyncOp = () => { return new Promise((resolve, reject) => { setTimeout(() => { console.log('resolving!') resolve(42); }, 1000); console.log('promise in progress...') }); } asyncOp().then(val => console.log('resolved with: ', val))
  • 48.
  • 49.
    const asyncOp =() => { return new Promise((resolve, reject) => { setTimeout(() => { console.log('resolving!') resolve(42); }, 1000); console.log('promise in progress...') }); } asyncOp().then(val => console.log('resolved', val)) const source = Rx.Observable.create(observer => { console.log('Observable in progress') const timeout = setTimeout(() => { observer.onNext('onNext after two seconds!'); }, 2000); return Rx.Disposable.create(() => { console.log('disposed'); clearTimeout(timeout); }); }); const subscription = source.subscribe( val => console.log('onNext:', val), ); setTimeout(() => subscription.dispose(), 500);
  • 50.
    Creation - Observable.from Arguments,Array, Iterable, Set, Map, String, Generated sequence Rx.Observable.from([1, 2, 3], x => x + x).subscribe( x => console.log(`onNext: ${x}`), e => console.log(`onError: ${e}`), () => console.log('onCompleted') ); // ! 2nd argument is a mapFn // => onNext: 2 // => onNext: 4 // => onNext: 6 // => onCompleted
  • 51.
    Creation - Observable.from${PutHereWhatYouHave} .fromCallback,.fromPromise, fromEvent, fromNodeCallback, const firstNameInput = document.querySelector('#firstName'); Rx.Observable.fromEvent(firstNameInput, 'input') .pluck('target', 'value') .subscribe(e => console.log(e));
  • 52.
  • 54.
    const source =Rx.Observable.interval(1000) .map(val => val + 1) .take(10); source .subscribe(val => console.log('first: ' + val)); setTimeout(() => { source .subscribe(val => console.log('second: ' + val)); }, 5000); // "first: 1" // "first: 2" // "first: 3" // "second: 1" // "first: 4" // "second: 2" // ... const source = Rx.Observable.interval(1000) .map(val => val + 1) .take(10) .publish() .refCount(); source .subscribe(val => console.log('first: ' + val)); setTimeout(() => { source.subscribe(val => console.log('second: ' + val)); }, 5000); // "first: 1" // "first: 2" // "first: 3" // "second: 3" // "first: 4" // ...
  • 55.
    Hot - Live stream .publish() +.refcount() Cold - Recorded video by default
  • 56.
  • 57.
    .debounce | .throttle //https://remysharp.com/2010/07/21/throttling-function-calls function throttle(fn, threshhold, scope) { threshhold || (threshhold = 250); var last, deferTimer; return function () { var context = scope || this, now = +new Date, args = arguments; if (last && now < last + threshhold) { clearTimeout(deferTimer); deferTimer = setTimeout(function () { last = now; fn.apply(context, args); }, threshhold); } else { last = now; fn.apply(context, args); } }; } window.addEventListener('resize', throttle(function (event) {console.log('resized');}), false); Rx.Observable.fromEvent(window, 'resize') .throttle(250) .subscribe(val => console.log('resized', window. innerWidth))
  • 58.
    .map and .flatMap Rx.Observable.fromArray(['good','evening', 'ng- party!']) .map(item => item.toUpperCase()) .subscribe(val => console.log(val)); // "GOOD" // "EVENING" // "NG-PARTY!" Rx.Observable.fromArray(['good', 'evening', 'ng- party!']) .map(item => Promise.resolve(item.toUpperCase())) .subscribe(val => console.log(val)); // [object Object] { ... }, [object Object] { ... }, [object Object] { ... } Rx.Observable.fromArray(['good', 'evening', 'ng-party!']) .flatMap(item => Promise.resolve(item.toUpperCase())) .subscribe(val => console.log(val)); // "GOOD" // "EVENING" // "NG-PARTY!"
  • 60.
    .takeUntil const intervalSource =Rx.Observable.interval(500) const source = Rx.Observable .fromEvent(document.querySelector('#toggle-stream'), 'click'); source .pluck('target', 'checked') .filter(val => val) .flatMapLatest(() => intervalSource.takeUntil(source)) .subscribe(val => console.log('isChecked?', val),)
  • 61.
    .reduce or .scan Rx.Observable.fromArray(['It','is', 'time', 'for', 'beer','ng-party!']) .reduce((p, c) => p + ' ' + c, '') .subscribe(val => console.log(val.trim())); // "It is time for beer ng-party!" Rx.Observable.interval(500) .timeInterval() .pluck('interval') .reduce((p, c) => p + c, 10000) .subscribe(val => console.log(val)); ? ? ? ? ? ? ? ?
  • 63.
    .reduce or .scan Rx.Observable.fromArray(['It','is', 'time', 'for', 'beer','ng-party!']) .reduce((p, c) => p + ' ' + c, '') .subscribe(val => console.log(val.trim())); // "It is time for beer ng-party!" Rx.Observable.interval(500) .timeInterval() .pluck('interval') .scan((p, c) => p + c, 10000) .subscribe(val => console.log(val)); // 10502 // 11001 // 11501 // 12001
  • 64.
  • 65.
  • 66.
    RxJS Next npm installrxjs-es import Rx from 'rxjs/Rx'; Rx.Observable.of(1,2,3) import {Observable} from 'rxjs/Observable'; import 'rxjs/add/operator/map'; Observable.of(1,2,3).map(x => x + '!!!'); // etc import {Observable} from 'rxjs/Observable'; import {map} from 'rxjs/operator/map'; Observable.of(1,2,3)::map(x => x + '!!!'); // etc /* ES7 */ import { map, takeWhile, forEach } from "iterlib"; getPlayers() ::map(x => x.character()) ::takeWhile(x => x.strength > 100) ::forEach(x => console.log(x)); /* ES6 */ import { map, takeWhile, forEach } from "iterlib"; let _val; _val = getPlayers(); _val = map.call(_val, x => x.character()); _val = takeWhile.call(_val, x => x.strength > 100); _val = forEach.call(_val, x => console.log(x));
  • 67.
  • 68.
    Use cases Sane workflowswith DOM Events (mousemove, keypress, drag and drop) UI Manipulation API Communication State management (Components, AppState, Dispatching) Set operations (map, filter, scan) Transducers
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.