Think Async Asynchronous Patterns in NodeJS - Adam L Barrett
Think Async Asynchronous Patterns in NodeJS - Adam L Barrett JavaScript
Morning Exercise
What is Async?
A-synchronous?
Concurrency
Async vs Parallel
Async vs Parallel JavaScript
Why care?
The Patterns…
The Patterns • Callbacks • Thunks • Promises • Tasks • Observables • Generators • Async / Await • Streams • Async Iteration • CSP
The Callback
The Callback Continuation-Passing Style (CPS)
 if you’re nasty 
fs.readFile('/foo.txt', (err, data) => { // If an error occurred, handle it (throw, propagate, etc) if(err) { console.log('Unknown Error'); return; } // Otherwise, log the file contents console.log(data); });
• Can be called multiple times • Controlled is given to the producer • Is the basic JavaScript unit of work
Thunks
let foo = () => 1 + 2;
let foo = ( callback ) => callback(1 + 2); // use it foo(v => console.log(v));
const thunk = require('thunks')(); const fs = require('fs'); thunk( done => fs.stat('package.json', done) )( (error, res) => console.log(error, res) );
function makeASandwichWithSecretSauce(forPerson) { return (dispatch) => { return fetchSecretSauce().then( sauce => dispatch( makeASandwich(forPerson) ), error => dispatch( apologize(forPerson, error) ) ); }; }
• good for passing around work • controlling the flow over time • easy to understand
Promises
const verifyUser = function(username, password) { database.verifyUser(username, password) .then(userInfo => database.getRoles(userInfo)) .then(rolesInfo => database.logAccess(rolesInfo)) .then(finalResult => { //do whatever the 'callback' would do }) .catch(err => { //do whatever the error handler needs }); };
new Promise((resolve, reject) => { const xhr = new XMLHttpRequest(); xhr.open("GET", '/api/ponies-of-equestria'); xhr.onload = () => resolve(xhr.responseText); xhr.onerror = () => reject(xhr.statusText); xhr.send(); });
fetch(url) .then(res => res.json()) .then(response => console.log('Success:', response)) .catch(error => console.error('Error:', error)) .finally(() => console.log('I will happen either way'));
• Eager, not lazy • one value per promise • immutable value once settled • Can’t be cancelled (at this time) • A crazy combination of map and flatmap
Tasks
const task = new Task( (reject, resolve) => { fs.readFile(path, (error, data) => { if (error) reject(error) else resolve(data) }) }) task.fork( error => { throw error }, data => { console.log(data) } )
• Basically like promises without the eager problem • better for composition and such • no crazy behaviour regarding flatmapping
Observables
function listen(element, eventName) { return new Observable( observer => { // Create an event handler which sends data to the sink let handler = event => observer.next(event); // Attach the event handler element.addEventListener(eventName, handler, true); // Return a cleanup function which will cancel the stream return () => { // Detach the event handler from the element element.removeEventListener(eventName, handler, true); }; }); }
// Return an observable of special key down commands function commandKeys(element) { let keyCommands = { "38": "up", "40": "down" }; return listen(element, "keydown") .filter(event => event.keyCode in keyCommands) .map(event => keyCommands[event.keyCode]) }
let subscription = commandKeys(inputElement).subscribe({ next(val) { console.log("Received key command: " + val) }, error(err) { console.log("Received an error: " + err) }, complete() { console.log("Stream complete") }, });
Observable.from({ [Symbol.observable]() { return new Observable(observer => { setTimeout(() => { observer.next("hello"); observer.next("world"); observer.complete(); }, 2000); }); } })
• Composable • Lazy (don’t do anything until observer subscribes) • declarative • multiple values over time
Generators
function* generator(i) { yield i; yield i + 10; }
function* idMaker() { var index = 0; while (true) { yield index++; } } var gen = idMaker(); console.log( gen.next().value ); // 0 console.log( gen.next().value ); // 1 console.log( gen.next().value ); // 2 console.log( gen.next().value ); // 3
function* logGenerator() { console.log( 0 ); console.log( 1, yield ); console.log( 2, yield ); console.log( 3, yield ); } var gen = logGenerator(); gen.next(); // 0 gen.next( 'pretzel' ); // 1 pretzel gen.next( 'california' ); // 2 california gen.next( 'mayonnaise' ); // 3 mayonnaise
function* anotherGenerator( i ) { yield i + 1; yield i + 2; } function* generator( i ) { yield i; yield* anotherGenerator( i ); yield i + 10; } var gen = generator(10); console.log( gen.next().value ); // 10 console.log( gen.next().value ); // 11 console.log( gen.next().value ); // 12 console.log( gen.next().value ); // 20
function* yieldAndReturn() { yield "Y"; return "R"; yield "unreachable"; } var gen = yieldAndReturn() console.log(gen.next()); // { value: "Y", done: false } console.log(gen.next()); // { value: "R", done: true } console.log(gen.next()); // { value: undefined, done: true }
const myIterable = {}; myIterable[Symbol.iterator] = function* () { yield 1; yield 2; yield 3; }; [...myIterable]; // [1, 2, 3]
co(function *(){ const userResult = yield fetch('/api/current-user'); const { id: user } = yield userResult.json(); const r = yield fetch('/api/books', qs.stringify({ user })); const books = yield r.json(); return books; }) .catch(onerror);
• Crazy powerful primitive for building on top of • makes creating iterables a snap • very imperative • led us to async / await
Async / Await
async function getCurrentUserBooks(){ const userResult = await fetch('/api/current-user'); const { id: user } = await userResult.json(); const r = await fetch('/api/books', qs.stringify({ user })); return await r.json(); }
async function getAllTheThings(){ try { await doSomethingAsync(); const results = await nowGetResults(); return await results.things.getAll(); } catch (err) { handleErrorProperly(err) } }
const fs = require('fs'); const util = require('util'); // Convert fs.readFile into Promise version of same const readFile = util.promisify(fs.readFile); async function getStuff() { return await readFile('test'); } getStuff().then(data => { console.log(data); })
• Can clean up so much messy code • highly composable • limited in all the ways promises are • no top level await (yet)
(async function(){ await initializeApp(); console.log('App Initialized'); }())
Streams
Streams
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
const fs = require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
const { Readable, Writable } = require('stream'); const { Transform, Duplex } = require('stream'); Readable - example fs.createReadStream() Writable - example fs.createWriteStream() Duplex - example net.Socket Transform - example zlib.createDeflate()
readableSrcStream .pipe(transformStream1) .pipe(transformStream2) .pipe(finalWrtitableDest)
const { Writable } = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);
const { Transform } = require('stream'); const commaSplitter = new Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push( chunk.toString().trim().split(',') ); callback(); } });
• Should be your goto async abstraction for nodejs • somewhat declarative composition • handles complex parts, like back pressure for you • Saves memory and resources
Async Iteration
async function asyncSum(asyncIterable) { let result = 0; const iterator = asyncIterable[Symbol.asyncIterator](); while (true) { const object = await iterator.next(); if (object.done) break; result += object.value; } return result; }
// Signature: AsyncIterable ! AsyncIterable async function* numberLines(lines) { let n = 1; for await (const line of lines) { yield `${n} ${line}`; n++; } }
async function* readLines(path) { let file = await fileOpen(path); try { while (!file.EOF) { yield await file.readLine(); } } finally { await file.close(); } }
• Composable • built in for-await-of • still declarative • multiple values over time
CSP
import Channel from 'async-csp'; let channel = new Channel(); async function puts(ch) { await ch.put(1); await ch.put(2); await ch.put(3); } async function takes(ch) { console.log(await ch.take()); // resolves to 1 console.log(await ch.take()); // resolves to 2 console.log(await ch.take()); // resolves to 3 } puts(channel); takes(channel);
// Channel × Channel ! void async function numberLines(inputChannel, outputChannel) { for (let n=1;; n++) { const line = await inputChannel.take(); if (line === Channel.DONE) { break; } outputChannel.put(`${n} ${line}`); } outputChannel.close(); }
import Channel from 'async-csp' async function sleep(duration) { return new Promise(resolve => setTimeout(resolve, duration)) } async function player(name, table) { while (true) { let ball = await table.take(); if (ball === Channel.DONE) { console.log('${name}: table is gone!'); break; } ball.hits++; console.log('${name}! Hits: ${ball.hits}'); await sleep(100); await table.put(ball); } } async function pingPong() { console.log('Opening ping-pong channel!'); let table = new Channel(); player('ping', table); player('pong', table); console.log('Serving ball...'); let ball = {hits: 0}; await table.put(ball); await sleep(1000); console.log('Closing ping-pong channel...'); table.close(); await table.done(); console.log('Channel is fully closed!'); console.log('Ball was hit ${ball.hits} times!'); } pingPong()
import Channel from 'async-csp' async function sleep(duration) { return new Promise(resolve => setTimeout(resolve, duration)) } async function player(name, table) { while (true) { let ball = await table.take(); if (ball === Channel.DONE) { console.log('${name}: table is gone!'); break; } ball.hits++; console.log('${name}! Hits: ${ball.hits}'); await sleep(100); await table.put(ball); } }
async function sleep(duration) { return new Promise(resolve => setTimeout(resolve, duration)) } async function player(name, table) { while (true) { let ball = await table.take(); if (ball === Channel.DONE) { console.log('${name}: table is gone!'); break; } ball.hits++; console.log('${name}! Hits: ${ball.hits}'); await sleep(100); await table.put(ball); } } async function pingPong() { console.log('Opening ping-pong channel!');
async function pingPong() { console.log('Opening ping-pong channel!'); let table = new Channel(); player('ping', table); player('pong', table); console.log('Serving ball...'); let ball = {hits: 0}; await table.put(ball); await sleep(1000); console.log('Closing ping-pong channel...'); table.close(); await table.done(); console.log('Channel is fully closed!'); console.log('Ball was hit ${ball.hits} times!'); }
await table.put(ball); await sleep(1000); console.log('Closing ping-pong channel...'); table.close(); await table.done(); console.log('Channel is fully closed!'); console.log('Ball was hit ${ball.hits} times!'); } pingPong()
Opening ping-pong channel! Serving ball... ping! Hits: 1 pong! Hits: 2 ping! Hits: 3 pong! Hits: 4 ping! Hits: 5 pong! Hits: 6 ping! Hits: 7 pong! Hits: 8 ping! Hits: 9 Closing ping-pong channel... pong: table's gone! Channel is fully closed! Ball was hit 9 times! ping: table's gone!
• Composable • Great Separation of concerns • error handling may be odd • simple but powerful
The Patterns • Callbacks • Thunks • Promises • Tasks • Observables • Generators • Async / Await • Streams • Async Iteration • CSP
Think Async Asynchronous Patterns in NodeJS - Adam L Barrett @adamlbarrett BigAB

Think Async: Asynchronous Patterns in NodeJS

  • 1.
    Think Async Asynchronous Patternsin NodeJS - Adam L Barrett
  • 2.
    Think Async Asynchronous Patternsin NodeJS - Adam L Barrett JavaScript
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
    The Patterns • Callbacks •Thunks • Promises • Tasks • Observables • Generators • Async / Await • Streams • Async Iteration • CSP
  • 12.
  • 13.
    The Callback Continuation-Passing Style(CPS)
 if you’re nasty 
  • 14.
    fs.readFile('/foo.txt', (err, data)=> { // If an error occurred, handle it (throw, propagate, etc) if(err) { console.log('Unknown Error'); return; } // Otherwise, log the file contents console.log(data); });
  • 15.
    • Can becalled multiple times • Controlled is given to the producer • Is the basic JavaScript unit of work
  • 16.
  • 17.
    let foo =() => 1 + 2;
  • 18.
    let foo =( callback ) => callback(1 + 2); // use it foo(v => console.log(v));
  • 19.
    const thunk =require('thunks')(); const fs = require('fs'); thunk( done => fs.stat('package.json', done) )( (error, res) => console.log(error, res) );
  • 20.
    function makeASandwichWithSecretSauce(forPerson) { return(dispatch) => { return fetchSecretSauce().then( sauce => dispatch( makeASandwich(forPerson) ), error => dispatch( apologize(forPerson, error) ) ); }; }
  • 21.
    • good forpassing around work • controlling the flow over time • easy to understand
  • 22.
  • 23.
    const verifyUser =function(username, password) { database.verifyUser(username, password) .then(userInfo => database.getRoles(userInfo)) .then(rolesInfo => database.logAccess(rolesInfo)) .then(finalResult => { //do whatever the 'callback' would do }) .catch(err => { //do whatever the error handler needs }); };
  • 24.
    new Promise((resolve, reject)=> { const xhr = new XMLHttpRequest(); xhr.open("GET", '/api/ponies-of-equestria'); xhr.onload = () => resolve(xhr.responseText); xhr.onerror = () => reject(xhr.statusText); xhr.send(); });
  • 25.
    fetch(url) .then(res => res.json()) .then(response=> console.log('Success:', response)) .catch(error => console.error('Error:', error)) .finally(() => console.log('I will happen either way'));
  • 26.
    • Eager, notlazy • one value per promise • immutable value once settled • Can’t be cancelled (at this time) • A crazy combination of map and flatmap
  • 27.
  • 28.
    const task =new Task( (reject, resolve) => { fs.readFile(path, (error, data) => { if (error) reject(error) else resolve(data) }) }) task.fork( error => { throw error }, data => { console.log(data) } )
  • 29.
    • Basically likepromises without the eager problem • better for composition and such • no crazy behaviour regarding flatmapping
  • 30.
  • 31.
    function listen(element, eventName){ return new Observable( observer => { // Create an event handler which sends data to the sink let handler = event => observer.next(event); // Attach the event handler element.addEventListener(eventName, handler, true); // Return a cleanup function which will cancel the stream return () => { // Detach the event handler from the element element.removeEventListener(eventName, handler, true); }; }); }
  • 32.
    // Return anobservable of special key down commands function commandKeys(element) { let keyCommands = { "38": "up", "40": "down" }; return listen(element, "keydown") .filter(event => event.keyCode in keyCommands) .map(event => keyCommands[event.keyCode]) }
  • 33.
    let subscription =commandKeys(inputElement).subscribe({ next(val) { console.log("Received key command: " + val) }, error(err) { console.log("Received an error: " + err) }, complete() { console.log("Stream complete") }, });
  • 34.
    Observable.from({ [Symbol.observable]() { return newObservable(observer => { setTimeout(() => { observer.next("hello"); observer.next("world"); observer.complete(); }, 2000); }); } })
  • 35.
    • Composable • Lazy(don’t do anything until observer subscribes) • declarative • multiple values over time
  • 36.
  • 37.
  • 38.
    function* idMaker() { varindex = 0; while (true) { yield index++; } } var gen = idMaker(); console.log( gen.next().value ); // 0 console.log( gen.next().value ); // 1 console.log( gen.next().value ); // 2 console.log( gen.next().value ); // 3
  • 39.
    function* logGenerator() { console.log(0 ); console.log( 1, yield ); console.log( 2, yield ); console.log( 3, yield ); } var gen = logGenerator(); gen.next(); // 0 gen.next( 'pretzel' ); // 1 pretzel gen.next( 'california' ); // 2 california gen.next( 'mayonnaise' ); // 3 mayonnaise
  • 40.
    function* anotherGenerator( i) { yield i + 1; yield i + 2; } function* generator( i ) { yield i; yield* anotherGenerator( i ); yield i + 10; } var gen = generator(10); console.log( gen.next().value ); // 10 console.log( gen.next().value ); // 11 console.log( gen.next().value ); // 12 console.log( gen.next().value ); // 20
  • 41.
    function* yieldAndReturn() { yield"Y"; return "R"; yield "unreachable"; } var gen = yieldAndReturn() console.log(gen.next()); // { value: "Y", done: false } console.log(gen.next()); // { value: "R", done: true } console.log(gen.next()); // { value: undefined, done: true }
  • 42.
    const myIterable ={}; myIterable[Symbol.iterator] = function* () { yield 1; yield 2; yield 3; }; [...myIterable]; // [1, 2, 3]
  • 43.
    co(function *(){ const userResult= yield fetch('/api/current-user'); const { id: user } = yield userResult.json(); const r = yield fetch('/api/books', qs.stringify({ user })); const books = yield r.json(); return books; }) .catch(onerror);
  • 44.
    • Crazy powerfulprimitive for building on top of • makes creating iterables a snap • very imperative • led us to async / await
  • 45.
  • 46.
    async function getCurrentUserBooks(){ constuserResult = await fetch('/api/current-user'); const { id: user } = await userResult.json(); const r = await fetch('/api/books', qs.stringify({ user })); return await r.json(); }
  • 47.
    async function getAllTheThings(){ try{ await doSomethingAsync(); const results = await nowGetResults(); return await results.things.getAll(); } catch (err) { handleErrorProperly(err) } }
  • 48.
    const fs =require('fs'); const util = require('util'); // Convert fs.readFile into Promise version of same const readFile = util.promisify(fs.readFile); async function getStuff() { return await readFile('test'); } getStuff().then(data => { console.log(data); })
  • 49.
    • Can cleanup so much messy code • highly composable • limited in all the ways promises are • no top level await (yet)
  • 50.
  • 51.
  • 52.
  • 53.
    const fs =require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data); }); }); server.listen(8000);
  • 54.
    const fs =require('fs'); const server = require('http').createServer(); server.on('request', (req, res) => { const src = fs.createReadStream('./big.file'); src.pipe(res); }); server.listen(8000);
  • 55.
    const { Readable,Writable } = require('stream'); const { Transform, Duplex } = require('stream'); Readable - example fs.createReadStream() Writable - example fs.createWriteStream() Duplex - example net.Socket Transform - example zlib.createDeflate()
  • 56.
  • 57.
    const { Writable} = require('stream'); const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()); callback(); } }); process.stdin.pipe(outStream);
  • 58.
    const { Transform} = require('stream'); const commaSplitter = new Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push( chunk.toString().trim().split(',') ); callback(); } });
  • 59.
    • Should beyour goto async abstraction for nodejs • somewhat declarative composition • handles complex parts, like back pressure for you • Saves memory and resources
  • 60.
  • 61.
    async function asyncSum(asyncIterable){ let result = 0; const iterator = asyncIterable[Symbol.asyncIterator](); while (true) { const object = await iterator.next(); if (object.done) break; result += object.value; } return result; }
  • 62.
    // Signature: AsyncIterable! AsyncIterable async function* numberLines(lines) { let n = 1; for await (const line of lines) { yield `${n} ${line}`; n++; } }
  • 63.
    async function* readLines(path){ let file = await fileOpen(path); try { while (!file.EOF) { yield await file.readLine(); } } finally { await file.close(); } }
  • 64.
    • Composable • builtin for-await-of • still declarative • multiple values over time
  • 65.
  • 67.
    import Channel from'async-csp'; let channel = new Channel(); async function puts(ch) { await ch.put(1); await ch.put(2); await ch.put(3); } async function takes(ch) { console.log(await ch.take()); // resolves to 1 console.log(await ch.take()); // resolves to 2 console.log(await ch.take()); // resolves to 3 } puts(channel); takes(channel);
  • 68.
    // Channel ×Channel ! void async function numberLines(inputChannel, outputChannel) { for (let n=1;; n++) { const line = await inputChannel.take(); if (line === Channel.DONE) { break; } outputChannel.put(`${n} ${line}`); } outputChannel.close(); }
  • 69.
    import Channel from'async-csp' async function sleep(duration) { return new Promise(resolve => setTimeout(resolve, duration)) } async function player(name, table) { while (true) { let ball = await table.take(); if (ball === Channel.DONE) { console.log('${name}: table is gone!'); break; } ball.hits++; console.log('${name}! Hits: ${ball.hits}'); await sleep(100); await table.put(ball); } } async function pingPong() { console.log('Opening ping-pong channel!'); let table = new Channel(); player('ping', table); player('pong', table); console.log('Serving ball...'); let ball = {hits: 0}; await table.put(ball); await sleep(1000); console.log('Closing ping-pong channel...'); table.close(); await table.done(); console.log('Channel is fully closed!'); console.log('Ball was hit ${ball.hits} times!'); } pingPong()
  • 70.
    import Channel from'async-csp' async function sleep(duration) { return new Promise(resolve => setTimeout(resolve, duration)) } async function player(name, table) { while (true) { let ball = await table.take(); if (ball === Channel.DONE) { console.log('${name}: table is gone!'); break; } ball.hits++; console.log('${name}! Hits: ${ball.hits}'); await sleep(100); await table.put(ball); } }
  • 71.
    async function sleep(duration){ return new Promise(resolve => setTimeout(resolve, duration)) } async function player(name, table) { while (true) { let ball = await table.take(); if (ball === Channel.DONE) { console.log('${name}: table is gone!'); break; } ball.hits++; console.log('${name}! Hits: ${ball.hits}'); await sleep(100); await table.put(ball); } } async function pingPong() { console.log('Opening ping-pong channel!');
  • 72.
    async function pingPong(){ console.log('Opening ping-pong channel!'); let table = new Channel(); player('ping', table); player('pong', table); console.log('Serving ball...'); let ball = {hits: 0}; await table.put(ball); await sleep(1000); console.log('Closing ping-pong channel...'); table.close(); await table.done(); console.log('Channel is fully closed!'); console.log('Ball was hit ${ball.hits} times!'); }
  • 73.
    await table.put(ball); await sleep(1000); console.log('Closingping-pong channel...'); table.close(); await table.done(); console.log('Channel is fully closed!'); console.log('Ball was hit ${ball.hits} times!'); } pingPong()
  • 74.
    Opening ping-pong channel! Servingball... ping! Hits: 1 pong! Hits: 2 ping! Hits: 3 pong! Hits: 4 ping! Hits: 5 pong! Hits: 6 ping! Hits: 7 pong! Hits: 8 ping! Hits: 9 Closing ping-pong channel... pong: table's gone! Channel is fully closed! Ball was hit 9 times! ping: table's gone!
  • 75.
    • Composable • GreatSeparation of concerns • error handling may be odd • simple but powerful
  • 76.
    The Patterns • Callbacks •Thunks • Promises • Tasks • Observables • Generators • Async / Await • Streams • Async Iteration • CSP
  • 77.
    Think Async Asynchronous Patternsin NodeJS - Adam L Barrett @adamlbarrett BigAB