Asynchronous programming done right. Without race conditions ..ions ..io on ..ns ditions. by Piotr Pelczar (Athlan)
About me Piotr Pelczar Freelancer for 8yrs PHP, Node.js, Java/Groovy Zend Certified Engineer IPIJ, Startups
Stay in touch athlan.pl me@athlan.pl facebook.com/piotr.pelczar github.com/athlan slideshare.net/piotrpelczar linkedin.com/in/ppelczar
Asynchronous programming Asynchronous actions are actions executed in a non-blocking scheme, allowing the main program flow to continue processing.
How software lives in hardware? Operating systems are process based Each process has assigned processor, registers, memory
How software lives in hardware? Process paralelism using threads (thread pools) Switching processor over processes/threads causes context switching
1. context switching = wasting time
Sync programming In trivial, sequential approach Each operation is executed sequentially: O(t) > O(t+1) if O(t) stucks, O(t+1) waits...
Sync programming
This is cool, software flow is predictible But not in high throughput I/O I/O costs because of waiting time...
High throughput I/O High throughput I/O doesn't mean: Memory operations Fast single-thread computing
High throughput I/O High throughput I/O means: HTTP requests Database connections Queue system dispatching HDD operations
2. Avoid I/O blocking
2. Avoid I/O blocking
Single-threaded, event loop model Imagine a man, who has a task: Walk around When bucket is full of water, just pour another bucket Go to next bucket
There is no sequences In async programming, results appears in no sequences operation1(); // will output "operation1 finished." operation2(); // will output "operation2 finished." operation3(); // will output "operation3 finished."
There is no sequences operation1() would be var amqp = require("amqp") var eventbus = amqp.createConnection(); console.log("AMQP connecting..."); eventbus.on("ready", function() { console.log("AMQP connected..."); callback(); return; });
There is no sequences operation2() would be var redis = require("redis") var conn = redis.createClient(port, host, options); console.log("Redis connecting..."); conn.auth(pass, function(err) { if(err) console.log("Redis failed..."); else console.log("Redis connected..."); callback(); return; });
There is no sequences operation3() would be var mongojs = require("mongojs"); console.log("Mongo connecting..."); var conn = mongojs.connect(connectionString); // blocking operation console.log("Mongo connected..."); callback(); return;
There is no sequences Expectations? AMQP connecting... // operation1() AMQP connected... // operation1() Redis connecting... // operation2() Redis failed... // operation2() Mongo connecting... // operation3(), blocking Mongo connected... // operation3()
There is no sequences Expectations?
There is no sequences The result: AMQP connecting... // operation1() Redis connecting... // operation2() Mongo connecting... // operation3(), blocking Mongo connected... // operation3() Redis failed... // operation2() AMQP connected... // operation1()
There is no sequences
So... what functions returns? You can perform future tasks in function, so what will be returned? value123 will be returned, function my_function() { operation1(); operation2(); operation3(); return "value123"; } just after blocking code, without waiting for non-blocking.
Assume: Functions does NOT returns values The function block is executed immedietally from top to bottom. You cannot rely to return value, because it is useless.
Callbacks Callback is the reference to function. var callbackFunction = function(result) { console.log("Result: %s", result) } When operation is done, the callback function is executed. callbackFunction("test1") // "Result: test1" will be printed out
Callbacks If callbackFunction is a variable (value = reference), so can be passed it via function argument. var callbackFunction = function() { ... } someOtherFunction(callbackFunction); function someOtherFunction(callback) { callback(); // execute function from argument }
Callbacks Functions can be defined as anonymous (closures) function someOtherFunction(callback) { var arg1 = "test"; callback(arg1); // execute function from argument } someOtherFunction(function(arg1) { console.log('done... %s', arg1); })
Callbacks can be nested Nesting callbacks makes code unreadeable: var amqp = require('amqp'); var connection = amqp.createConnection(); connection.on('ready', function() { connection.exchange("ex1", function(exchange) { connection.queue('queue1', function(q) { q.bind(exchange, 'r1'); q.subscribe(function(json, headers, info, m) { console.log("msg: " + JSON.stringify(json)); }); }); }); });
Callbacks can be nested Nesting callbacks makes code unreadeable: var amqp = require('amqp'); var connection = amqp.createConnection(); connection.on('ready', function() { connection.exchange("ex1", function(exchange) { connection.queue('queue1', function(q) { q.bind(exchange, 'r1'); q.subscribe(function(json, headers, info, m) { console.log("msg: " + JSON.stringify(json)); table.update(select, data, function() { table.find(select, function(err, rows) { // inserted rows... } }); }); }); }); });
Asynchronous control flows Promise design pattern Libraries that manages callbacks references
Promise design pattern 1. Client fires function that will return result in the future in the future, so it is a promise 2. Function returns promise object immedietaly before non-blocking operations 3. Client registers callbacks 4. Callbacks will be fired in the future, when task is done var resultPromise = loader.loadData(sourceFile) resultPromise(function success(data) { // this function will be called while operation will succeed }, function error(err) { // on fail })
Promise design pattern 1. Create deferred object 2. Return def.promise 3. Call resolve() or reject() var loadData = function(sourceFile) { var def = deferred() , proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile]) var commandProcessBuff = null , commandProcessBuffError = null; proc.stdout.on('data', function (data) { commandProcessBuff += data }) proc.stderr.on('data', function (data) { commandProcessBuffError += data }) proc.on('close', function (code) { if(null !== commandProcessBuffError) def.reject(commandProcessBuffError) else def.resolve(commandProcessBuff) }) return def.promise }
Promise design pattern
Async Node.js library Provides control flows like: Sequences (series) Waterfalls (sequences with parameters passing) Parallel (with limit) Some/every conditions While/until Queue
Async Node.js library Series
Async Node.js library Series async.series([ function(callback) { // operation1 }, function(callback) { // operation2 }, function(callback) { // operation3 } ], function() { console.log('all operations done') })
Async Node.js library Parallel async.parallel([ function(callback) { // operation1 }, function(callback) { // operation2 }, function(callback) { // operation3 } ], function() { console.log('all operations done') })
Async Node.js library Parallel limit
Async Node.js library Parallel limit var tasks = [ function(callback) { // operation1 }, function(callback) { // operation2 }, // ... ] async.parallelLimit(tasks, 2, function() { console.log('all operations done') })
Async Node.js library Waterfall async.waterfall([ function(callback) { // operation1 callback(null, arg1, arg2) }, function(arg1, arg2, callback) { // operation2 callback(null, foo, bar) }, function(foo, bar, callback) { // operation3 } ], function() { console.log('all operations done') })
Async Node.js library Whilst async.doWhilst( function(done) { // operation1 done(null, arg1, arg2) }, function() { return pages < limit } ], function() { console.log('done') })
Asynchronous programming traps Dealing with callbacks may be tricky. Keep your code clean.
Unnamed callbacks Keep your code clean, don't name callback function callback function doSomething(callback) { return callback; }
Unnamed callbacks function doSomething(callback) { doAnotherThing(function(callback2) { doYetAnotherThing(function(callback3) { return callback(); }) }) }
Unnamed callbacks Instead of this, name your callbacks function doSomething(done) { doAnotherThing(function(doneFetchingFromApi) { doYetAnotherThing(function(doneWritingToDatabase) { return done(); }) }) }
Double callbacks function doSomething(done) { doAnotherThing(function (err) { if (err) done(err); done(null, result); }); } Callback is fired twice!
Double callbacks Fix: Always prepend callback execution with return statement. function doSomething(done) { doAnotherThing(function (err) { if (err) return done(err); return done(null, result); }); } Normally, return ends function execution, why do not keep this rule while async.
Double callbacks Double callbacks are very hard to debug. The callback wrapper can be written and execute it only once. setTimeout(function() { done('a') }, 200) setTimeout(function() { done('b') }, 500)
Double callbacks var CallbackOnce = function(callback) { this.isFired = false this.callback = callback } CallbackOnce.prototype.create = function() { var delegate = this return function() { if(delegate.isFired) return delegate.isFired = true delegate.callback.apply(null, arguments) } }
Double callbacks obj1 = new CallbackOnce(done) // decorate callback safeDone = obj1.create() // safeDone() is proxy function that passes arguments setTimeout(function() { safeDone('a') // safe now... }, 200) setTimeout(function() { safeDone('b') // safe now... }, 500)
Unexpected callbacks Never fire callback until task is done. function doSomething(done) { doAnotherThing(function () { if (condition) { var result = null // prepare result... return done(result); } return done(null); }); } The ending return will be fired even if condition pass.
Unexpected callbacks Never fire callback until task is done. function doSomething(done) { doAnotherThing(function () { if (condition) { var result = null // prepare result... return done(result); } else { return done(null); } }); }
Unexpected callbacks Never use callback in try clause! function (callback) { another_function(function (err, some_data) { if (err) return callback(err); try { callback(null, JSON.parse(some_data)); // error here } catch(err) { callback(new Error(some_data + ' is not a valid JSON')); } }); } If callback throws an exception, then it is executed exactly twice!
Unexpected callbacks Never use callback in try clause! function (callback) { another_function(function (err, some_data) { if (err) return callback(err); try { var parsed = JSON.parse(some_data) } catch(err) { return callback(new Error(some_data + ' is not a valid JSON')); } callback(null, parsed); }); }
Unexpected callbacks Never use callback in try clause!
Take care of events Read docs carefully. Really. function doSomething(done) { var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile]) var procBuff = ''; proc.stdout.on('data', function (data) { procBuff += data; }); // WAT?! proc.stderr.on('data', function (data) { done(new Error("An error occured: " + data)) }); proc.on('close', function (code) { done(null, procBuff); } }
Take care of events Read docs carefully. Really. function doSomething(done) { var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile]) var procBuff = ''; var procBuffError = ''; proc.stdout.on('data', function (data) { procBuff += data; }); proc.stderr.on('data', function (data) { proc += data; }); proc.on('close', function (code) { if(code !== 0) { return done(new Error("An error occured: " + procBuffError)); } else { return done(null, procBuff) } } }
Unreadable and logs Keep in mind, that asynchronous logs will interweave There are not sequenced Or there will be same log strings
Unexpected callbacks Asynchronous logs will interweave
Unreadable and logs Logs without use context are useless... function getResults(keyword, done) { http.request(url, function(response) { console.log('Fetching from API') response.on('error', function(err) { console.log('API error') }) }); }
Unreadable and logs function getResults(keyword, done) { var logContext = { keyword: keyword } http.request(url, function(response) { console.log(logContext, 'Fetching from API') response.on('error', function(err) { console.log(logContext, 'API error') }) }); }
Unreadable and logs Centralize your logs - use logstash And make them searcheable - Elasticsearch + Kibana
Too many opened background-tasks While running parallel in order to satisfy first-better algorithm, others should be aborted
Too many opened background-tasks Provide cancellation API: var events = require('events') function getResults(keyword) { var def = deferred() var eventbus = new events.EventEmitter() var req = http.request(url, function(response) { var err = null , content = null res.on('data', function(chunk) { content += chunk; }); response.on('close', function() { if(err) return def.reject(err) else return def.resolve(content) }) response.on('error', function(err) { err += err }) });
Too many opened background-tasks Provide cancellation API: var response = getResults('test') response.result(function success() { // ... }, function error() { // ... }) // if we need response.events.emit('abort')
Everything runs in parallel except your code. When currently code is running, (not waiting for I/O descriptors) whole event loop is blocked.
THE END by Piotr Pelczar
Q&A by Piotr Pelczar

Asynchronous programming done right - Node.js

  • 1.
    Asynchronous programming doneright. Without race conditions ..ions ..io on ..ns ditions. by Piotr Pelczar (Athlan)
  • 3.
    About me PiotrPelczar Freelancer for 8yrs PHP, Node.js, Java/Groovy Zend Certified Engineer IPIJ, Startups
  • 4.
    Stay in touch athlan.pl me@athlan.pl facebook.com/piotr.pelczar github.com/athlan slideshare.net/piotrpelczar linkedin.com/in/ppelczar
  • 5.
    Asynchronous programming Asynchronousactions are actions executed in a non-blocking scheme, allowing the main program flow to continue processing.
  • 6.
    How software livesin hardware? Operating systems are process based Each process has assigned processor, registers, memory
  • 7.
    How software livesin hardware? Process paralelism using threads (thread pools) Switching processor over processes/threads causes context switching
  • 8.
    1. context switching= wasting time
  • 9.
    Sync programming Intrivial, sequential approach Each operation is executed sequentially: O(t) > O(t+1) if O(t) stucks, O(t+1) waits...
  • 10.
  • 11.
    This is cool,software flow is predictible But not in high throughput I/O I/O costs because of waiting time...
  • 12.
    High throughput I/O High throughput I/O doesn't mean: Memory operations Fast single-thread computing
  • 13.
    High throughput I/O High throughput I/O means: HTTP requests Database connections Queue system dispatching HDD operations
  • 14.
    2. Avoid I/Oblocking
  • 15.
    2. Avoid I/Oblocking
  • 16.
    Single-threaded, event loop model Imagine a man, who has a task: Walk around When bucket is full of water, just pour another bucket Go to next bucket
  • 17.
    There is nosequences In async programming, results appears in no sequences operation1(); // will output "operation1 finished." operation2(); // will output "operation2 finished." operation3(); // will output "operation3 finished."
  • 18.
    There is nosequences operation1() would be var amqp = require("amqp") var eventbus = amqp.createConnection(); console.log("AMQP connecting..."); eventbus.on("ready", function() { console.log("AMQP connected..."); callback(); return; });
  • 19.
    There is nosequences operation2() would be var redis = require("redis") var conn = redis.createClient(port, host, options); console.log("Redis connecting..."); conn.auth(pass, function(err) { if(err) console.log("Redis failed..."); else console.log("Redis connected..."); callback(); return; });
  • 20.
    There is nosequences operation3() would be var mongojs = require("mongojs"); console.log("Mongo connecting..."); var conn = mongojs.connect(connectionString); // blocking operation console.log("Mongo connected..."); callback(); return;
  • 21.
    There is nosequences Expectations? AMQP connecting... // operation1() AMQP connected... // operation1() Redis connecting... // operation2() Redis failed... // operation2() Mongo connecting... // operation3(), blocking Mongo connected... // operation3()
  • 22.
    There is nosequences Expectations?
  • 23.
    There is nosequences The result: AMQP connecting... // operation1() Redis connecting... // operation2() Mongo connecting... // operation3(), blocking Mongo connected... // operation3() Redis failed... // operation2() AMQP connected... // operation1()
  • 24.
    There is nosequences
  • 25.
    So... what functions returns? You can perform future tasks in function, so what will be returned? value123 will be returned, function my_function() { operation1(); operation2(); operation3(); return "value123"; } just after blocking code, without waiting for non-blocking.
  • 26.
    Assume: Functions does NOT returns values The function block is executed immedietally from top to bottom. You cannot rely to return value, because it is useless.
  • 27.
    Callbacks Callback isthe reference to function. var callbackFunction = function(result) { console.log("Result: %s", result) } When operation is done, the callback function is executed. callbackFunction("test1") // "Result: test1" will be printed out
  • 28.
    Callbacks If callbackFunctionis a variable (value = reference), so can be passed it via function argument. var callbackFunction = function() { ... } someOtherFunction(callbackFunction); function someOtherFunction(callback) { callback(); // execute function from argument }
  • 29.
    Callbacks Functions canbe defined as anonymous (closures) function someOtherFunction(callback) { var arg1 = "test"; callback(arg1); // execute function from argument } someOtherFunction(function(arg1) { console.log('done... %s', arg1); })
  • 30.
    Callbacks can benested Nesting callbacks makes code unreadeable: var amqp = require('amqp'); var connection = amqp.createConnection(); connection.on('ready', function() { connection.exchange("ex1", function(exchange) { connection.queue('queue1', function(q) { q.bind(exchange, 'r1'); q.subscribe(function(json, headers, info, m) { console.log("msg: " + JSON.stringify(json)); }); }); }); });
  • 31.
    Callbacks can benested Nesting callbacks makes code unreadeable: var amqp = require('amqp'); var connection = amqp.createConnection(); connection.on('ready', function() { connection.exchange("ex1", function(exchange) { connection.queue('queue1', function(q) { q.bind(exchange, 'r1'); q.subscribe(function(json, headers, info, m) { console.log("msg: " + JSON.stringify(json)); table.update(select, data, function() { table.find(select, function(err, rows) { // inserted rows... } }); }); }); }); });
  • 32.
    Asynchronous control flows Promise design pattern Libraries that manages callbacks references
  • 33.
    Promise design pattern 1. Client fires function that will return result in the future in the future, so it is a promise 2. Function returns promise object immedietaly before non-blocking operations 3. Client registers callbacks 4. Callbacks will be fired in the future, when task is done var resultPromise = loader.loadData(sourceFile) resultPromise(function success(data) { // this function will be called while operation will succeed }, function error(err) { // on fail })
  • 34.
    Promise design pattern 1. Create deferred object 2. Return def.promise 3. Call resolve() or reject() var loadData = function(sourceFile) { var def = deferred() , proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile]) var commandProcessBuff = null , commandProcessBuffError = null; proc.stdout.on('data', function (data) { commandProcessBuff += data }) proc.stderr.on('data', function (data) { commandProcessBuffError += data }) proc.on('close', function (code) { if(null !== commandProcessBuffError) def.reject(commandProcessBuffError) else def.resolve(commandProcessBuff) }) return def.promise }
  • 35.
  • 36.
    Async Node.js library Provides control flows like: Sequences (series) Waterfalls (sequences with parameters passing) Parallel (with limit) Some/every conditions While/until Queue
  • 37.
  • 38.
    Async Node.js library Series async.series([ function(callback) { // operation1 }, function(callback) { // operation2 }, function(callback) { // operation3 } ], function() { console.log('all operations done') })
  • 39.
    Async Node.js library Parallel async.parallel([ function(callback) { // operation1 }, function(callback) { // operation2 }, function(callback) { // operation3 } ], function() { console.log('all operations done') })
  • 40.
    Async Node.js library Parallel limit
  • 41.
    Async Node.js library Parallel limit var tasks = [ function(callback) { // operation1 }, function(callback) { // operation2 }, // ... ] async.parallelLimit(tasks, 2, function() { console.log('all operations done') })
  • 42.
    Async Node.js library Waterfall async.waterfall([ function(callback) { // operation1 callback(null, arg1, arg2) }, function(arg1, arg2, callback) { // operation2 callback(null, foo, bar) }, function(foo, bar, callback) { // operation3 } ], function() { console.log('all operations done') })
  • 43.
    Async Node.js library Whilst async.doWhilst( function(done) { // operation1 done(null, arg1, arg2) }, function() { return pages < limit } ], function() { console.log('done') })
  • 44.
    Asynchronous programming traps Dealing with callbacks may be tricky. Keep your code clean.
  • 45.
    Unnamed callbacks Keepyour code clean, don't name callback function callback function doSomething(callback) { return callback; }
  • 46.
    Unnamed callbacks functiondoSomething(callback) { doAnotherThing(function(callback2) { doYetAnotherThing(function(callback3) { return callback(); }) }) }
  • 47.
    Unnamed callbacks Insteadof this, name your callbacks function doSomething(done) { doAnotherThing(function(doneFetchingFromApi) { doYetAnotherThing(function(doneWritingToDatabase) { return done(); }) }) }
  • 48.
    Double callbacks functiondoSomething(done) { doAnotherThing(function (err) { if (err) done(err); done(null, result); }); } Callback is fired twice!
  • 49.
    Double callbacks Fix:Always prepend callback execution with return statement. function doSomething(done) { doAnotherThing(function (err) { if (err) return done(err); return done(null, result); }); } Normally, return ends function execution, why do not keep this rule while async.
  • 50.
    Double callbacks Doublecallbacks are very hard to debug. The callback wrapper can be written and execute it only once. setTimeout(function() { done('a') }, 200) setTimeout(function() { done('b') }, 500)
  • 51.
    Double callbacks varCallbackOnce = function(callback) { this.isFired = false this.callback = callback } CallbackOnce.prototype.create = function() { var delegate = this return function() { if(delegate.isFired) return delegate.isFired = true delegate.callback.apply(null, arguments) } }
  • 52.
    Double callbacks obj1= new CallbackOnce(done) // decorate callback safeDone = obj1.create() // safeDone() is proxy function that passes arguments setTimeout(function() { safeDone('a') // safe now... }, 200) setTimeout(function() { safeDone('b') // safe now... }, 500)
  • 53.
    Unexpected callbacks Neverfire callback until task is done. function doSomething(done) { doAnotherThing(function () { if (condition) { var result = null // prepare result... return done(result); } return done(null); }); } The ending return will be fired even if condition pass.
  • 54.
    Unexpected callbacks Neverfire callback until task is done. function doSomething(done) { doAnotherThing(function () { if (condition) { var result = null // prepare result... return done(result); } else { return done(null); } }); }
  • 55.
    Unexpected callbacks Neveruse callback in try clause! function (callback) { another_function(function (err, some_data) { if (err) return callback(err); try { callback(null, JSON.parse(some_data)); // error here } catch(err) { callback(new Error(some_data + ' is not a valid JSON')); } }); } If callback throws an exception, then it is executed exactly twice!
  • 56.
    Unexpected callbacks Neveruse callback in try clause! function (callback) { another_function(function (err, some_data) { if (err) return callback(err); try { var parsed = JSON.parse(some_data) } catch(err) { return callback(new Error(some_data + ' is not a valid JSON')); } callback(null, parsed); }); }
  • 57.
    Unexpected callbacks Neveruse callback in try clause!
  • 58.
    Take care ofevents Read docs carefully. Really. function doSomething(done) { var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile]) var procBuff = ''; proc.stdout.on('data', function (data) { procBuff += data; }); // WAT?! proc.stderr.on('data', function (data) { done(new Error("An error occured: " + data)) }); proc.on('close', function (code) { done(null, procBuff); } }
  • 59.
    Take care ofevents Read docs carefully. Really. function doSomething(done) { var proc = process.spawn('java', ['-jar', 'loadData.jar', sourceFile]) var procBuff = ''; var procBuffError = ''; proc.stdout.on('data', function (data) { procBuff += data; }); proc.stderr.on('data', function (data) { proc += data; }); proc.on('close', function (code) { if(code !== 0) { return done(new Error("An error occured: " + procBuffError)); } else { return done(null, procBuff) } } }
  • 60.
    Unreadable and logs Keep in mind, that asynchronous logs will interweave There are not sequenced Or there will be same log strings
  • 61.
  • 62.
    Unreadable and logs Logs without use context are useless... function getResults(keyword, done) { http.request(url, function(response) { console.log('Fetching from API') response.on('error', function(err) { console.log('API error') }) }); }
  • 63.
    Unreadable and logs function getResults(keyword, done) { var logContext = { keyword: keyword } http.request(url, function(response) { console.log(logContext, 'Fetching from API') response.on('error', function(err) { console.log(logContext, 'API error') }) }); }
  • 64.
    Unreadable and logs Centralize your logs - use logstash And make them searcheable - Elasticsearch + Kibana
  • 65.
    Too many opened background-tasks While running parallel in order to satisfy first-better algorithm, others should be aborted
  • 66.
    Too many opened background-tasks Provide cancellation API: var events = require('events') function getResults(keyword) { var def = deferred() var eventbus = new events.EventEmitter() var req = http.request(url, function(response) { var err = null , content = null res.on('data', function(chunk) { content += chunk; }); response.on('close', function() { if(err) return def.reject(err) else return def.resolve(content) }) response.on('error', function(err) { err += err }) });
  • 67.
    Too many opened background-tasks Provide cancellation API: var response = getResults('test') response.result(function success() { // ... }, function error() { // ... }) // if we need response.events.emit('abort')
  • 68.
    Everything runs inparallel except your code. When currently code is running, (not waiting for I/O descriptors) whole event loop is blocked.
  • 69.
    THE END byPiotr Pelczar
  • 70.
    Q&A by PiotrPelczar