Intro to core.async #cljsyd, July 2013 Leonardo Borges @leonardo_borges www.leonardoborges.com www.thoughtworks.com Tuesday, 13 August 13
Background • Nothing new • Based on Communicating Sequential Processes (CSP) • CSP was first described by Tony Hoare in 1978 • You probably heard about it from the Go community • They love their channels and goroutines Tuesday, 13 August 13
goroutines: lightweight processes // doing some stuff... go myFunction("argument") //does stuff in the background... //continuing about my business... kinda look like futures in this case.... but there’s more to it Tuesday, 13 August 13
lightweight? • goroutines don’t map 1-1 to threads • They get their own thread pool (number of cores + 2 in Clojure, uses the event loop in Clojurescript) • The runtime takes care of multiplexing them • Easy win due to language support Tuesday, 13 August 13
Why? • Looking for more ways to be efficient and achieve concurrency • A thread per client model can get expensive quickly • Threads spend most of their time waiting for things to happen • Put this idle time to good use! Tuesday, 13 August 13
But goroutines aren’t terribly interesting on their own. They’re just the beginning. Tuesday, 13 August 13
Channels • Allow goroutines to talk to each other • First-class citizens • Can be thought of as concurrent blocking queues Tuesday, 13 August 13
Channels c := make(chan string) go func() { time.Sleep(time.Duration(5000) * time.Millisecond) c <- "Leo" }() fmt.Printf("Hello: %sn", <-c) //this will block until the channel has something to give us Tuesday, 13 August 13
But what about Clojure? Patience, young padawan, we’ll get there... Tuesday, 13 August 13
Example 1 • We wish to implement a search service which is itself dependent on 3 other search services: web, images and video • Each individual service has unpredictable performance • Also, clients shouldn’t need to wait for slow services • Stolen from Rob Pike’s presentation, “Go Concurrency Patterns”[1] [1] http://bit.ly/go-concurrency-patterns Tuesday, 13 August 13
Example 1 Video Service Image Service Web Service Search service Client Tuesday, 13 August 13
Example 1: the service var ( Web = fakeSearch("web") Image = fakeSearch("image") Video = fakeSearch("video") ) type Search func(query string) Result func fakeSearch(kind string) Search { return func(query string) Result { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return Result(fmt.Sprintf("%s result for %qn", kind, query)) } } Tuesday, 13 August 13
Example 1: the client c := make(chan Result) go func() { c <- Web(query) } () go func() { c <- Image(query) } () go func() { c <- Video(query) } () timeout := time.After(80 * time.Millisecond) for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return } } return Tuesday, 13 August 13
Example 1: the client c := make(chan Result) go func() { c <- Web(query) } () go func() { c <- Image(query) } () go func() { c <- Video(query) } () timeout := time.After(80 * time.Millisecond) for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return } } return Timeout channels: channels which close after msecs Tuesday, 13 August 13
Example 1: the client c := make(chan Result) go func() { c <- Web(query) } () go func() { c <- Image(query) } () go func() { c <- Video(query) } () timeout := time.After(80 * time.Millisecond) for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return } } return Can be used in select blocks to “give up” on slow alternatives Tuesday, 13 August 13
Yes. select/case can be thought of as switch/case statements for channels. Tuesday, 13 August 13
select/case • Makes a single choice from a set of channels • Immediately returns once any of the channels either responds or closes • In our example, if a service is too slow, the timeout channel closes first Tuesday, 13 August 13
Enough Go. Let’s rewrite the code in Clojurescript! Tuesday, 13 August 13
Example 1: the service (defn fake-search [kind] (fn [query] (let [c (chan)] (go (<! (timeout (rand-int 100))) (>! c (str "<span>" kind " result for " query "</span>"))) c))) (def web (fake-search "Web")) (def image (fake-search "Image")) (def video (fake-search "Video")) Tuesday, 13 August 13
Example 1: the client (defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v))))))))) Tuesday, 13 August 13
Example 1: the client (defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v))))))))) Same deal: a timeout channel Tuesday, 13 August 13
Example 1: the client (defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v))))))))) alt! - Clojure’s answer to Go’s select Tuesday, 13 August 13
Demo Tuesday, 13 August 13
Example 2 • From David Nolen’s CSP post [2] • In his words: “We will coordinate three independent processes running at three different speeds via a fourth process which shows the results of the coordination without any obvious use of mutation - only recursion” [2] http://bit.ly/david-nolen-csp • He also said this demo “should seem impossible for those familiar with JavaScript” - Challenge accepted! Tuesday, 13 August 13
This time, demo first. Tuesday, 13 August 13
Example 2: Clojurescript (def c (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) Tuesday, 13 August 13
Example 2: Clojurescript (def c (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) The three independent, different speed processes Tuesday, 13 August 13
Example 2: Clojurescript (def c (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) The fourth process, responsible for rendering Tuesday, 13 August 13
Example 2: Javascript - part I var messageChannel = new MessageChannel(); var tasks = []; messageChannel.port1.onmessage = function(msg) { tasks.shift()(); }; var c = []; function publishValue(value, timeout) { setTimeout(function() { c.push(value); publishValue(value, timeout); }, timeout); } publishValue(1, 250); publishValue(2, 1000); publishValue(3, 1500); Tuesday, 13 August 13
Example 2: Javascript - part II function renderValues(q) { tasks.push(function() { var v = c.shift(); if (v) { q.unshift(v); q = q.slice(0,10); var result = q.reduce(function(acc,p){ return acc+ "<div class='proc-" + p + "'>Process " + p + "</div>"; },""); document.getElementById("messages1").innerHTML = result; } renderValues(q); }); messageChannel.port2.postMessage(0); } renderValues([]); Tuesday, 13 August 13
Cljs vs. js - couldn’t resist it :) (def c (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) var messageChannel = new MessageChannel(); var tasks = []; messageChannel.port1.onmessage = function(msg) { tasks.shift()(); }; var c = []; function publishValue(value, timeout) { setTimeout(function() { c.push(value); publishValue(value, timeout); }, timeout); } publishValue(1, 250); publishValue(2, 1000); publishValue(3, 1500); function renderValues(q) { tasks.push(function() { var v = c.shift(); if (v) { q.unshift(v); q = q.slice(0,10); var result = q.reduce(function(acc,p){ return acc+ "<div class='proc-" + p + "'>Process " + p + "</div>"; },""); document.getElementById("messages1").innerHTML = result; } renderValues(q); }); messageChannel.port2.postMessage(0); } renderValues([]); Tuesday, 13 August 13
Wait! MessageChannel? Tuesday, 13 August 13
Under core.async’s hood • core.async is composed of several fairly involved macros and functions • At the end of the day, dispatching go blocks is platform specific • JVM has threads whereas JS has one main thread and an event loop Tuesday, 13 August 13
• the Javascript implementation dispatches like this: (ns cljs.core.async.impl.dispatch) ... (defn run [f] (cond (exists? js/MessageChannel) (queue-task f) (exists? js/setImmediate) (js/setImmediate f) :else (js/setTimeout f 0))) Under core.async’s hood Tuesday, 13 August 13
• The JVM on the other hand uses java.util.concurrent.Executors (ns ^{:skip-wiki true} clojure.core.async.impl.dispatch (:require [clojure.core.async.impl.protocols :as impl] [clojure.core.async.impl.exec.threadpool :as tp])) ... (def executor (delay (tp/thread-pool-executor))) (defn run "Runs Runnable r in a thread pool thread" [^Runnable r] (impl/exec @executor r)) Under core.async’s hood Tuesday, 13 August 13
Final thoughts • core.async isn’t magic • if you’re using blocking API’s you’ll starve its thread pool • though async frameworks such as Netty and http-kit can benefit from it • huge gains in cljs - UI’s are inherently concurrent Tuesday, 13 August 13
Questions? Leonardo Borges @leonardo_borges www.leonardoborges.com www.thoughtworks.com Tuesday, 13 August 13
References • http://www.leonardoborges.com/writings/2013/07/06/clojure-core-dot-async-lisp- advantage/ • http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html • http://swannodette.github.io/2013/07/12/communicating-sequential-processes/ • http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/ • http://bryangilbert.com/code/2013/07/19/escaping-callback-hell-with-core-async/ • http://thinkrelevance.com/blog/2013/07/10/rich-hickey-and-core-async-podcast- episode-035 Code: https://github.com/leonardoborges/core-async-intro Tuesday, 13 August 13

Intro to Clojure's core.async

  • 1.
    Intro to core.async #cljsyd,July 2013 Leonardo Borges @leonardo_borges www.leonardoborges.com www.thoughtworks.com Tuesday, 13 August 13
  • 2.
    Background • Nothing new •Based on Communicating Sequential Processes (CSP) • CSP was first described by Tony Hoare in 1978 • You probably heard about it from the Go community • They love their channels and goroutines Tuesday, 13 August 13
  • 3.
    goroutines: lightweight processes // doingsome stuff... go myFunction("argument") //does stuff in the background... //continuing about my business... kinda look like futures in this case.... but there’s more to it Tuesday, 13 August 13
  • 4.
    lightweight? • goroutines don’tmap 1-1 to threads • They get their own thread pool (number of cores + 2 in Clojure, uses the event loop in Clojurescript) • The runtime takes care of multiplexing them • Easy win due to language support Tuesday, 13 August 13
  • 5.
    Why? • Looking formore ways to be efficient and achieve concurrency • A thread per client model can get expensive quickly • Threads spend most of their time waiting for things to happen • Put this idle time to good use! Tuesday, 13 August 13
  • 6.
    But goroutines aren’tterribly interesting on their own. They’re just the beginning. Tuesday, 13 August 13
  • 7.
    Channels • Allow goroutinesto talk to each other • First-class citizens • Can be thought of as concurrent blocking queues Tuesday, 13 August 13
  • 8.
    Channels c := make(chanstring) go func() { time.Sleep(time.Duration(5000) * time.Millisecond) c <- "Leo" }() fmt.Printf("Hello: %sn", <-c) //this will block until the channel has something to give us Tuesday, 13 August 13
  • 9.
    But what aboutClojure? Patience, young padawan, we’ll get there... Tuesday, 13 August 13
  • 10.
    Example 1 • Wewish to implement a search service which is itself dependent on 3 other search services: web, images and video • Each individual service has unpredictable performance • Also, clients shouldn’t need to wait for slow services • Stolen from Rob Pike’s presentation, “Go Concurrency Patterns”[1] [1] http://bit.ly/go-concurrency-patterns Tuesday, 13 August 13
  • 11.
    Example 1 Video ServiceImage Service Web Service Search service Client Tuesday, 13 August 13
  • 12.
    Example 1: theservice var ( Web = fakeSearch("web") Image = fakeSearch("image") Video = fakeSearch("video") ) type Search func(query string) Result func fakeSearch(kind string) Search { return func(query string) Result { time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) return Result(fmt.Sprintf("%s result for %qn", kind, query)) } } Tuesday, 13 August 13
  • 13.
    Example 1: theclient c := make(chan Result) go func() { c <- Web(query) } () go func() { c <- Image(query) } () go func() { c <- Video(query) } () timeout := time.After(80 * time.Millisecond) for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return } } return Tuesday, 13 August 13
  • 14.
    Example 1: theclient c := make(chan Result) go func() { c <- Web(query) } () go func() { c <- Image(query) } () go func() { c <- Video(query) } () timeout := time.After(80 * time.Millisecond) for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return } } return Timeout channels: channels which close after msecs Tuesday, 13 August 13
  • 15.
    Example 1: theclient c := make(chan Result) go func() { c <- Web(query) } () go func() { c <- Image(query) } () go func() { c <- Video(query) } () timeout := time.After(80 * time.Millisecond) for i := 0; i < 3; i++ { select { case result := <-c: results = append(results, result) case <-timeout: fmt.Println("timed out") return } } return Can be used in select blocks to “give up” on slow alternatives Tuesday, 13 August 13
  • 16.
    Yes. select/case canbe thought of as switch/case statements for channels. Tuesday, 13 August 13
  • 17.
    select/case • Makes asingle choice from a set of channels • Immediately returns once any of the channels either responds or closes • In our example, if a service is too slow, the timeout channel closes first Tuesday, 13 August 13
  • 18.
    Enough Go. Let’srewrite the code in Clojurescript! Tuesday, 13 August 13
  • 19.
    Example 1: theservice (defn fake-search [kind] (fn [query] (let [c (chan)] (go (<! (timeout (rand-int 100))) (>! c (str "<span>" kind " result for " query "</span>"))) c))) (def web (fake-search "Web")) (def image (fake-search "Image")) (def video (fake-search "Video")) Tuesday, 13 August 13
  • 20.
    Example 1: theclient (defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v))))))))) Tuesday, 13 August 13
  • 21.
    Example 1: theclient (defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v))))))))) Same deal: a timeout channel Tuesday, 13 August 13
  • 22.
    Example 1: theclient (defn google [query] (let [c (chan) t (timeout 75)] (go (>! c (<! (web query)))) (go (>! c (<! (image query)))) (go (>! c (<! (video query)))) (go (loop [i 0 acc []] (if (> i 2) acc (recur (inc i) (conj acc (alt! [c t] ([v] v))))))))) alt! - Clojure’s answer to Go’s select Tuesday, 13 August 13
  • 23.
  • 24.
    Example 2 • FromDavid Nolen’s CSP post [2] • In his words: “We will coordinate three independent processes running at three different speeds via a fourth process which shows the results of the coordination without any obvious use of mutation - only recursion” [2] http://bit.ly/david-nolen-csp • He also said this demo “should seem impossible for those familiar with JavaScript” - Challenge accepted! Tuesday, 13 August 13
  • 25.
    This time, demofirst. Tuesday, 13 August 13
  • 26.
    Example 2: Clojurescript (defc (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) Tuesday, 13 August 13
  • 27.
    Example 2: Clojurescript (defc (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) The three independent, different speed processes Tuesday, 13 August 13
  • 28.
    Example 2: Clojurescript (defc (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) The fourth process, responsible for rendering Tuesday, 13 August 13
  • 29.
    Example 2: Javascript- part I var messageChannel = new MessageChannel(); var tasks = []; messageChannel.port1.onmessage = function(msg) { tasks.shift()(); }; var c = []; function publishValue(value, timeout) { setTimeout(function() { c.push(value); publishValue(value, timeout); }, timeout); } publishValue(1, 250); publishValue(2, 1000); publishValue(3, 1500); Tuesday, 13 August 13
  • 30.
    Example 2: Javascript- part II function renderValues(q) { tasks.push(function() { var v = c.shift(); if (v) { q.unshift(v); q = q.slice(0,10); var result = q.reduce(function(acc,p){ return acc+ "<div class='proc-" + p + "'>Process " + p + "</div>"; },""); document.getElementById("messages1").innerHTML = result; } renderValues(q); }); messageChannel.port2.postMessage(0); } renderValues([]); Tuesday, 13 August 13
  • 31.
    Cljs vs. js- couldn’t resist it :) (def c (chan)) (defn render [q] (apply str (for [p (reverse q)] (str "<div class='proc-" p "'>Process " p "</div>")))) (go (while true (<! (async/timeout 250)) (>! c 1))) (go (while true (<! (async/timeout 1000)) (>! c 2))) (go (while true (<! (async/timeout 1500)) (>! c 3))) (defn peekn "Returns vector of (up to) n items from the end of vector v" [v n] (if (> (count v) n) (subvec v (- (count v) n)) v)) (let [out (by-id "messages")] (go (loop [q []] (set-html! out (render q)) (recur (-> (conj q (<! c)) (peekn 10)))))) var messageChannel = new MessageChannel(); var tasks = []; messageChannel.port1.onmessage = function(msg) { tasks.shift()(); }; var c = []; function publishValue(value, timeout) { setTimeout(function() { c.push(value); publishValue(value, timeout); }, timeout); } publishValue(1, 250); publishValue(2, 1000); publishValue(3, 1500); function renderValues(q) { tasks.push(function() { var v = c.shift(); if (v) { q.unshift(v); q = q.slice(0,10); var result = q.reduce(function(acc,p){ return acc+ "<div class='proc-" + p + "'>Process " + p + "</div>"; },""); document.getElementById("messages1").innerHTML = result; } renderValues(q); }); messageChannel.port2.postMessage(0); } renderValues([]); Tuesday, 13 August 13
  • 32.
  • 33.
    Under core.async’s hood •core.async is composed of several fairly involved macros and functions • At the end of the day, dispatching go blocks is platform specific • JVM has threads whereas JS has one main thread and an event loop Tuesday, 13 August 13
  • 34.
    • the Javascriptimplementation dispatches like this: (ns cljs.core.async.impl.dispatch) ... (defn run [f] (cond (exists? js/MessageChannel) (queue-task f) (exists? js/setImmediate) (js/setImmediate f) :else (js/setTimeout f 0))) Under core.async’s hood Tuesday, 13 August 13
  • 35.
    • The JVMon the other hand uses java.util.concurrent.Executors (ns ^{:skip-wiki true} clojure.core.async.impl.dispatch (:require [clojure.core.async.impl.protocols :as impl] [clojure.core.async.impl.exec.threadpool :as tp])) ... (def executor (delay (tp/thread-pool-executor))) (defn run "Runs Runnable r in a thread pool thread" [^Runnable r] (impl/exec @executor r)) Under core.async’s hood Tuesday, 13 August 13
  • 36.
    Final thoughts • core.asyncisn’t magic • if you’re using blocking API’s you’ll starve its thread pool • though async frameworks such as Netty and http-kit can benefit from it • huge gains in cljs - UI’s are inherently concurrent Tuesday, 13 August 13
  • 37.
  • 38.
    References • http://www.leonardoborges.com/writings/2013/07/06/clojure-core-dot-async-lisp- advantage/ • http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html •http://swannodette.github.io/2013/07/12/communicating-sequential-processes/ • http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/ • http://bryangilbert.com/code/2013/07/19/escaping-callback-hell-with-core-async/ • http://thinkrelevance.com/blog/2013/07/10/rich-hickey-and-core-async-podcast- episode-035 Code: https://github.com/leonardoborges/core-async-intro Tuesday, 13 August 13