1 how reactive programming can help in asynchronous non blocking environments Make ReactiveX and Node dance Enrico Piccinin Rome April 14th 2018
2 Enrico Piccinin Italia
3 ReactiveX is a library to implement the Observable pattern ReactiveX - Observable
4 A quick definition of ReactiveX and Observable What an Observable is A way to model streams of asynchronous events What you can do Transform the event streams via operators – apply functional programming style to streams of events Consume events via subscription Transform functions which use callbacks into Observables Miri Charlie Fede Stream completed time Each event comes with its own data
5 Subscription allows to define what to do with the events of a stream observable1 .subscribe( ) Miri Charlie Fede Stream completed observable1 data => { // do something with data received }, () => { // do something when the Observable completes }, err => { // manage the error },
6 Operators make Observables easy to use in a functional style observable1 1 2 3 4 5 .map(data => data * 2) 2 4 6 8 10 8 10 .filter(data => data > 5) 6 8.take(2) 6
7 Let’s see Observables in action in a very simple use case • Read all files in the Source directory • Transform the content of each file • Write a new file in Target directory • Write a line in the log for each file saved • Write on the console when the processing is completed Source Dir Target Dir transform level_1_file level_2_file level_3_file
8 … a real world use case Source Dir Target Dir transform This is an I/O bound process – the nirvana of Node ~150.000 Cobol source files Extract all sql statements About 20 times faster than synchronous I/O blocking logic
9 readFilesFromSourceDir(dir, cb(data, err) { }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) The callback approach - the traditional Node approach for each file end-for writeLog(newFile, cb(err, data) { }) Am I done?
10 What happens in an asynchronous non-blocking world SourceDir read File read File written File read File read File written Log written time File read Log written File written Log written Done File written Log written file1 file2 file3 file4 Dir Read Same pattern Log Written
11 Maybe we can see this from a slightly different perspective Log written Log written Functions & Streams of events
12 For this reactive model we need some Observables Functions & Event streams Done File written Log written File read SourceDir read Observables required Emits when the content of a file is read – completes when all files are read Emits when a new file is written – completes when all files are written Emits when a line in the log is written – completes when all logs lines are written Emits when the directory is read and then completes
13 We need to start from what node gives us: “callback based” functions fs.readdir(path[, options], callback)Read Directory fs.readFile(path[, options], callback)Read File fs.writeFile(file, data[, options], callback)Write File fs.write(fd, string[, position[, encoding]], callback)Append line readFilesFromSourceDir(dir, cb(data, err) { }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) for each file end-for writeLog(newFile, cb(err, data) { }) Am I done?
14 We can create Observables from “callback based” functions readDirObs(dir) File Names readDirObs = Observable.bindNodeCallback(fs.readdir) Emits the Array of File Names when the directory is read and then completes
15 Observables can be created also from Arrays names = [ ”Gina”, “Fede”, “Maria”, ”Carl”, “Rob” ] namesObs = Observable.from( names ) namesObs Gina Fede Maria Carl Rob Emits sequentially as many events as the number of Objects in the Array, each event carrying as content data the object itself, and then completes
16 Now we have all the tools we need to build our solution We can create Observables from callback based functions We can create Observables from Arrays We can transform Observables into other Observables via operators readFilesFromSourceDir(dir, cb(data, err) { }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) for each file end-for writeLog(newFile, cb(err, data) { }) Am I done? [1, 2, 3, … ] observable1 observable2 = operator( observable1 ) observable2
17 .switchMap(fileName => writeLogObs(fileName)) .switchMap(content => writeFileObs(content)) The first thing to do is to build the function that processes one file content switch fileName newContent = transform(content) writeFileObs(newContent) switch fileName writeLogObs(fileName) readFileObs(fileName)
18 We have created a function which returns an Observable which emits once a file has completed its processing function transformFile(name): Observable<string> { return readFileObs(name) .map(data => transform(data)) .switchMap(data => writeFileObs(data)) .switchMap(file => writeLogObs(file)) } File written Log written File read
19 .subscribe( data => console.log(data), err => console.error(err), () => console.log(“DONE”)) Back now to the entire journey: it all starts from the first Observable fileName Observable.from(fileNames) fileName fileName transformFile(fileName) merge .mergeMap(fileName => transformFile(fileName)) switch .switchMap(content => Observable.from(fileNames)) fileNames readDirObs(dirName) Done
20 How all pieces of code look like function transformFile(name): Observable<string> { return readFileObs(name) .map(data => transform(data)) .switchMap(data => writeFileObs(data)) .switchMap(file => writeLogObs(file)) } readDirObs(dir: string) .switchMap(files => Observable.from(files)) .mergeMap(file => transformFile(file)) .subscribe( file => console.log(file + ‘ logged’), err => { console.error(err) }, () => console.log(‘I am done’) ) File written Log written File read Done
21 Demo – add a line number to each Canto of Divina Commedia
22 How many files do I start reading before closing the first one? File Names switch content File name Observable.from(files) File name File name File name How many files do we open ?
23 Let’s make our use case more real – concurrency is limited Source Dir Target Dir transform level_1_file level_2_file level_3_file Limit concurrency readFilesFromSourceDir(dir, cb(data, err) { }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) for each file end-for writeLog(newFile, cb(err, data) { })
24 Add a limit to the concurrency of mergeMap SourceDir read readDirObs(dirName) File nameswitch .switchMap(data => Observable.from(data)) File name File name File processed File processed File processed transformFile(file) Done .subscribe( data => console.log(data), err => console.error(err), () => console.log(“DONE”)) merge .mergeMap(file => transformFile(file), nnn) level of concurrency
25 So what? Are Observables bringing any benefit here vs Promises? File written Log written File read 1 2 3 4 Observables 1 2 3 4 Promises NOT ALWAYS
26 But if we look at the big Picture? What if we have real streams of events to deal with? Done Observables Promises A B C 1 3 2
27 and the world is full of “asynchronous event streams” Serveless functions Databases Realtime Database HTTP Websockets
28 Some details on the topic discussed may be found here https://medium.freecodecamp.org/rxjs-and-node-8f4e0acebc7c The code of the samples shown in the demo can be downloaded from here https://github.com/codemotion-2018-rome-rxjs-node/rxjs-node-fs Thank you

Make RxJS and Node dance - how reactive programming can help in asynchronous non blocking environments - Enrico Piccinin - Codemotion Rome 2018

  • 1.
    1 how reactive programmingcan help in asynchronous non blocking environments Make ReactiveX and Node dance Enrico Piccinin Rome April 14th 2018
  • 2.
  • 3.
    3 ReactiveX is alibrary to implement the Observable pattern ReactiveX - Observable
  • 4.
    4 A quick definitionof ReactiveX and Observable What an Observable is A way to model streams of asynchronous events What you can do Transform the event streams via operators – apply functional programming style to streams of events Consume events via subscription Transform functions which use callbacks into Observables Miri Charlie Fede Stream completed time Each event comes with its own data
  • 5.
    5 Subscription allows todefine what to do with the events of a stream observable1 .subscribe( ) Miri Charlie Fede Stream completed observable1 data => { // do something with data received }, () => { // do something when the Observable completes }, err => { // manage the error },
  • 6.
    6 Operators make Observableseasy to use in a functional style observable1 1 2 3 4 5 .map(data => data * 2) 2 4 6 8 10 8 10 .filter(data => data > 5) 6 8.take(2) 6
  • 7.
    7 Let’s see Observablesin action in a very simple use case • Read all files in the Source directory • Transform the content of each file • Write a new file in Target directory • Write a line in the log for each file saved • Write on the console when the processing is completed Source Dir Target Dir transform level_1_file level_2_file level_3_file
  • 8.
    8 … a realworld use case Source Dir Target Dir transform This is an I/O bound process – the nirvana of Node ~150.000 Cobol source files Extract all sql statements About 20 times faster than synchronous I/O blocking logic
  • 9.
    9 readFilesFromSourceDir(dir, cb(data, err){ }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) The callback approach - the traditional Node approach for each file end-for writeLog(newFile, cb(err, data) { }) Am I done?
  • 10.
    10 What happens inan asynchronous non-blocking world SourceDir read File read File written File read File read File written Log written time File read Log written File written Log written Done File written Log written file1 file2 file3 file4 Dir Read Same pattern Log Written
  • 11.
    11 Maybe we cansee this from a slightly different perspective Log written Log written Functions & Streams of events
  • 12.
    12 For this reactivemodel we need some Observables Functions & Event streams Done File written Log written File read SourceDir read Observables required Emits when the content of a file is read – completes when all files are read Emits when a new file is written – completes when all files are written Emits when a line in the log is written – completes when all logs lines are written Emits when the directory is read and then completes
  • 13.
    13 We need tostart from what node gives us: “callback based” functions fs.readdir(path[, options], callback)Read Directory fs.readFile(path[, options], callback)Read File fs.writeFile(file, data[, options], callback)Write File fs.write(fd, string[, position[, encoding]], callback)Append line readFilesFromSourceDir(dir, cb(data, err) { }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) for each file end-for writeLog(newFile, cb(err, data) { }) Am I done?
  • 14.
    14 We can createObservables from “callback based” functions readDirObs(dir) File Names readDirObs = Observable.bindNodeCallback(fs.readdir) Emits the Array of File Names when the directory is read and then completes
  • 15.
    15 Observables can becreated also from Arrays names = [ ”Gina”, “Fede”, “Maria”, ”Carl”, “Rob” ] namesObs = Observable.from( names ) namesObs Gina Fede Maria Carl Rob Emits sequentially as many events as the number of Objects in the Array, each event carrying as content data the object itself, and then completes
  • 16.
    16 Now we haveall the tools we need to build our solution We can create Observables from callback based functions We can create Observables from Arrays We can transform Observables into other Observables via operators readFilesFromSourceDir(dir, cb(data, err) { }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) for each file end-for writeLog(newFile, cb(err, data) { }) Am I done? [1, 2, 3, … ] observable1 observable2 = operator( observable1 ) observable2
  • 17.
    17 .switchMap(fileName => writeLogObs(fileName)) .switchMap(content => writeFileObs(content)) Thefirst thing to do is to build the function that processes one file content switch fileName newContent = transform(content) writeFileObs(newContent) switch fileName writeLogObs(fileName) readFileObs(fileName)
  • 18.
    18 We have createda function which returns an Observable which emits once a file has completed its processing function transformFile(name): Observable<string> { return readFileObs(name) .map(data => transform(data)) .switchMap(data => writeFileObs(data)) .switchMap(file => writeLogObs(file)) } File written Log written File read
  • 19.
    19 .subscribe( data => console.log(data), err=> console.error(err), () => console.log(“DONE”)) Back now to the entire journey: it all starts from the first Observable fileName Observable.from(fileNames) fileName fileName transformFile(fileName) merge .mergeMap(fileName => transformFile(fileName)) switch .switchMap(content => Observable.from(fileNames)) fileNames readDirObs(dirName) Done
  • 20.
    20 How all piecesof code look like function transformFile(name): Observable<string> { return readFileObs(name) .map(data => transform(data)) .switchMap(data => writeFileObs(data)) .switchMap(file => writeLogObs(file)) } readDirObs(dir: string) .switchMap(files => Observable.from(files)) .mergeMap(file => transformFile(file)) .subscribe( file => console.log(file + ‘ logged’), err => { console.error(err) }, () => console.log(‘I am done’) ) File written Log written File read Done
  • 21.
    21 Demo – adda line number to each Canto of Divina Commedia
  • 22.
    22 How many filesdo I start reading before closing the first one? File Names switch content File name Observable.from(files) File name File name File name How many files do we open ?
  • 23.
    23 Let’s make ouruse case more real – concurrency is limited Source Dir Target Dir transform level_1_file level_2_file level_3_file Limit concurrency readFilesFromSourceDir(dir, cb(data, err) { }) readFileContent(path, cb(err, content) { transform(content) }) writeFileToTargetDir(newContent, cb(err, data) { }) for each file end-for writeLog(newFile, cb(err, data) { })
  • 24.
    24 Add a limitto the concurrency of mergeMap SourceDir read readDirObs(dirName) File nameswitch .switchMap(data => Observable.from(data)) File name File name File processed File processed File processed transformFile(file) Done .subscribe( data => console.log(data), err => console.error(err), () => console.log(“DONE”)) merge .mergeMap(file => transformFile(file), nnn) level of concurrency
  • 25.
    25 So what? AreObservables bringing any benefit here vs Promises? File written Log written File read 1 2 3 4 Observables 1 2 3 4 Promises NOT ALWAYS
  • 26.
    26 But if welook at the big Picture? What if we have real streams of events to deal with? Done Observables Promises A B C 1 3 2
  • 27.
    27 and the worldis full of “asynchronous event streams” Serveless functions Databases Realtime Database HTTP Websockets
  • 28.
    28 Some details onthe topic discussed may be found here https://medium.freecodecamp.org/rxjs-and-node-8f4e0acebc7c The code of the samples shown in the demo can be downloaded from here https://github.com/codemotion-2018-rome-rxjs-node/rxjs-node-fs Thank you