Functional Reactive Programming in the Netflix API Ben Christensen Software Engineer – API Platform at Netflix @benjchristensen http://www.linkedin.com/in/benjchristensen http://techblog.netflix.com/ QCon London – March 6 2013 onsdag den 6. marts 13
InfoQ.com: News & Community Site • 750,000 unique visitors/month • Published in 4 languages (English, Chinese, Japanese and Brazilian Portuguese) • Post content from our QCon conferences • News 15-20 / week • Articles 3-4 / week • Presentations (videos) 12-15 / week • Interviews 2-3 / week • Books 1 / month Watch the video with slide synchronization on InfoQ.com! http://www.infoq.com/presentations /netflix-functional-rx
Presented at QCon London www.qconlondon.com Purpose of QCon - to empower software development by facilitating the spread of knowledge and innovation Strategy - practitioner-driven conference designed for YOU: influencers of change and innovation in your teams - speakers and topics driving the evolution and innovation - connecting and catalyzing the influencers and innovators Highlights - attended by more than 12,000 delegates since 2007 - held in 9 cities worldwide
onsdag den 6. marts 13
More than 33 million Subscribers in more than 50 Countries and Territories onsdag den 6. marts 13
Netflix accounts for 33% of Peak Downstream Internet Traffic in North America Netflix subscribers are watching more than 1 billion hours a month onsdag den 6. marts 13
API traffic has grown from ~20 million/day in 2010 to >2 billion/day 0 500 1000 1500 2000 2010 2011 2012 Today millionsofAPIrequestsperday onsdag den 6. marts 13
Discovery Streaming onsdag den 6. marts 13
Netflix API Streaming onsdag den 6. marts 13
onsdag den 6. marts 13
Open API Netflix Devices API Request Volume by Audience onsdag den 6. marts 13
Netflix API Dependency A Dependency D Dependency G Dependency J Dependency M Dependency P Dependency B Dependency E Dependency H Dependency K Dependency N Dependency Q Dependency C Dependency F Dependency I Dependency L Dependency O Dependency R onsdag den 6. marts 13
/ps3/home Dependency F 10 Threads Dependency G 10 Threads Dependency H 10 Threads Dependency I 5 Threads Dependency J 8 Threads Dependency A 10 Threads Dependency B 8 Threads Dependency C 10 Threads Dependency D 15 Threads Dependency E 5 Threads Dependency K 15 Threads Dependency L 4 Threads Dependency M 5 Threads Dependency N 10 Threads Dependency O 10 Threads Dependency P 10 Threads Dependency Q 8 Threads Dependency R 10 Threads Dependency S 8 Threads Dependency T 10 Threads /android/home /tv/home Functional Reactive Dynamic Endpoints Asynchronous Java API onsdag den 6. marts 13
/ps3/home Dependency F 10 Threads Dependency G 10 Threads Dependency H 10 Threads Dependency I 5 Threads Dependency J 8 Threads Dependency A 10 Threads Dependency B 8 Threads Dependency C 10 Threads Dependency D 15 Threads Dependency E 5 Threads Dependency K 15 Threads Dependency L 4 Threads Dependency M 5 Threads Dependency N 10 Threads Dependency O 10 Threads Dependency P 10 Threads Dependency Q 8 Threads Dependency R 10 Threads Dependency S 8 Threads Dependency T 10 Threads /android/home /tv/home Functional Reactive Dynamic Endpoints Asynchronous Java API Hystrix fault-isolation layer onsdag den 6. marts 13
/ps3/home Dependency F 10 Threads Dependency G 10 Threads Dependency H 10 Threads Dependency I 5 Threads Dependency J 8 Threads Dependency A 10 Threads Dependency B 8 Threads Dependency C 10 Threads Dependency D 15 Threads Dependency E 5 Threads Dependency K 15 Threads Dependency L 4 Threads Dependency M 5 Threads Dependency N 10 Threads Dependency O 10 Threads Dependency P 10 Threads Dependency Q 8 Threads Dependency R 10 Threads Dependency S 8 Threads Dependency T 10 Threads /android/home /tv/home Functional Reactive Dynamic Endpoints Asynchronous Java API onsdag den 6. marts 13
RxJava “a library for composing asynchronous and event-based programs using observable sequences for the Java VM” A Java port of Rx (Reactive Extensions) https://rx.codeplex.com (.Net and Javascript by Microsoft) onsdag den 6. marts 13
Do we really need another way of “managing” concurrency? onsdag den 6. marts 13
Discovery of Rx began with a re-architecture ... onsdag den 6. marts 13
... that collapsed network traffic into coarse API calls ... onsdag den 6. marts 13
... that collapsed network traffic into coarse API calls ... Nested, conditional, parallel execution onsdag den 6. marts 13
... and we wanted to allow anybody to create endpoints, not just the “API Team” onsdag den 6. marts 13
onsdag den 6. marts 13
Concurrency without each engineer reading and re-reading this -> (awesome book ... everybody isn’t going to - or should have to - read it though, that’s the point) onsdag den 6. marts 13
Owner of API should retain control of concurrency behavior. onsdag den 6. marts 13
public Data getData(); What if the implementation needs to change from synchronous to asynchronous? How should the client execute that method without blocking? spawn a thread? Owner of API should retain control of concurrency behavior. onsdag den 6. marts 13
public void getData(Callback<T> c); public Future<T> getData(); public Future<List<Future<T>>> getData(); What about ... ? onsdag den 6. marts 13
Iterable pull Observable push T next() throws Exception returns; onNext(T) onError(Exception) onCompleted() onsdag den 6. marts 13
Iterable pull Observable push T next() throws Exception returns; onNext(T) onError(Exception) onCompleted()  //  Iterable<String>    //  that  contains  75  Strings  getDataFromLocalMemory()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .forEach(          {  println  "next  =>  "  +  it})  //  Observable<String>    //  that  emits  75  Strings  getDataFromNetwork()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .subscribe(          {  println  "onNext  =>  "  +  it}) onsdag den 6. marts 13
Iterable pull Observable push T next() throws Exception returns; onNext(T) onError(Exception) onCompleted()  //  Iterable<String>    //  that  contains  75  Strings  getDataFromLocalMemory()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .forEach(          {  println  "onNext  =>  "  +  it})  //  Observable<String>    //  that  emits  75  Strings  getDataFromNetwork()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .subscribe(          {  println  "onNext  =>  "  +  it}) onsdag den 6. marts 13
Instead of blocking APIs ... class  VideoService  {      def  VideoList  getPersonalizedListOfMovies(userId);      def  VideoBookmark  getBookmark(userId,  videoId);      def  VideoRating  getRating(userId,  videoId);      def  VideoMetadata  getMetadata(videoId); } class  VideoService  {      def  Observable<VideoList>  getPersonalizedListOfMovies(userId);      def  Observable<VideoBookmark>  getBookmark(userId,  videoId);      def  Observable<VideoRating>  getRating(userId,  videoId);      def  Observable<VideoMetadata>  getMetadata(videoId); } ... create Observable APIs: onsdag den 6. marts 13
onsdag den 6. marts 13
onsdag den 6. marts 13
onsdag den 6. marts 13
onsdag den 6. marts 13
onsdag den 6. marts 13
onsdag den 6. marts 13
Observable.toObservable("one",  "two",  "three")          .take(2)          .subscribe((arg)  -­‐>  {                    System.out.println(arg);          }); Java8 Observable.toObservable("one",  "two",  "three")    .take(2)    .subscribe((arg:  String)  =>  {            println(arg)    }) Scala (-­‐>      (Observable/toObservable  ["one"  "two"  "three"])    (.take  2)      (.subscribe  (fn  [arg]  (println  arg)))) Clojure    Observable.toObservable("one",  "two",  "three")        .take(2)          .subscribe({arg  -­‐>  println(arg)}) Groovy    Observable.toObservable("one",  "two",  "three")        .take(2)          .subscribe(lambda  {  |arg|  puts  arg  }) JRuby onsdag den 6. marts 13
       Observable.create({  observer  -­‐>            try  {                  observer.onNext(new  Video(id))                observer.onCompleted();            }  catch(Exception  e)  {                observer.onError(e);            }        }) onsdag den 6. marts 13
       def  Observable<VideoRating>  getRating(userId,  videoId)  {                //  fetch  the  VideoRating  for  this  user  asynchronously                return  Observable.create({  observer  -­‐>                        executor.execute(new  Runnable()  {                                def  void  run()  {                                    try  {                                          VideoRating  rating  =  ...  do  network  call  ...                                        observer.onNext(rating)                                        observer.onCompleted();                                      }  catch(Exception  e)  {                                        observer.onError(e);                                      }                                      }                        })                })        } Asynchronous Observable with Single Value onsdag den 6. marts 13
       def  Observable<VideoRating>  getRating(userId,  videoId)  {                //  fetch  the  VideoRating  for  this  user  asynchronously                return  Observable.create({  observer  -­‐>                        executor.execute(new  Runnable()  {                                def  void  run()  {                                    try  {                                          VideoRating  rating  =  ...  do  network  call  ...                                        observer.onNext(rating)                                        observer.onCompleted();                                      }  catch(Exception  e)  {                                        observer.onError(e);                                      }                                      }                        })                })        } Asynchronous Observable with Single Value onsdag den 6. marts 13
Synchronous Observable with Multiple Values        def  Observable<Video>  getVideos()  {                return  Observable.create({  observer  -­‐>                      try  {                              for(v  in  videos)  {                                observer.onNext(v)                          }                          observer.onCompleted();                      }  catch(Exception  e)  {                          observer.onError(e);                      }                })        } Caution: This is eager and will always emit all values regardless of subsequent operators such as take(10) onsdag den 6. marts 13
Synchronous Observable with Multiple Values        def  Observable<Video>  getVideos()  {                return  Observable.create({  observer  -­‐>                      try  {                              for(v  in  videos)  {                                observer.onNext(v)                          }                          observer.onCompleted();                      }  catch(Exception  e)  {                          observer.onError(e);                      }                })        } Caution: This is eager and will always emit all values regardless of subsequent operators such as take(10) onsdag den 6. marts 13
Asynchronous Observable with Multiple Values  def  Observable<Video>  getVideos()  {        return  Observable.create({  observer  -­‐>              executor.execute(new  Runnable()  {                    def  void  run()  {                        try  {                                for(id  in  videoIds)  {                                  Video  v  =  ...  do  network  call  ...                                  observer.onNext(v)                              }                              observer.onCompleted();                          }  catch(Exception  e)  {                              observer.onError(e);                          }                      }              })        })  } onsdag den 6. marts 13
Asynchronous Observable with Multiple Values  def  Observable<Video>  getVideos()  {        return  Observable.create({  observer  -­‐>              executor.execute(new  Runnable()  {                    def  void  run()  {                        try  {                                for(id  in  videoIds)  {                                  Video  v  =  ...  do  network  call  ...                                  observer.onNext(v)                              }                              observer.onCompleted();                          }  catch(Exception  e)  {                              observer.onError(e);                          }                      }              })        })  } onsdag den 6. marts 13
Observable<SomeData> a = getDataA(); Observable<SomeData> b = getDataB(); Observable<SomeData> c = getDataC(); Observable.merge(a, b, c) .subscribe( { element -> println("data: " + element)}, { exception -> println("error occurred: " + exception.getMessage())} ) Combining via Merge onsdag den 6. marts 13
Observable<SomeData> a = getDataA(); Observable<String> b = getDataB(); Observable<MoreData> c = getDataC(); Observable.zip(a, b, c, {x, y, z -> [x, y, z]}) .subscribe( { triple -> println("a: " + triple[0] + " b: " + triple[1] + " c: " + triple[2])}, { exception -> println("error occurred: " + exception.getMessage())} ) Combining via Zip onsdag den 6. marts 13
Observable<SomeData> a = getDataA(); Observable<String> b = getDataB(); Observable<MoreData> c = getDataC(); Observable.zip(a, b, c, {x, y, z -> [x, y, z]}) .subscribe( { triple -> println("a: " + triple[0] + " b: " + triple[1] + " c: " + triple[2])}, { exception -> println("error occurred: " + exception.getMessage())} ) Error Handling onsdag den 6. marts 13
Observable<SomeData> a = getDataA(); Observable<String> b = getDataB(); Observable<MoreData> c = getDataC() .onErrorResumeNext(getFallbackForDataC()); Observable.zip(a, b, c, {x, y, z -> [x, y, z]}) .subscribe( { triple -> println("a: " + triple[0] + " b: " + triple[1] + " c: " + triple[2])}, { exception -> println("error occurred: " + exception.getMessage())} ) Error Handling onsdag den 6. marts 13
def Observable getVideos(userId) { return VideoService.getVideos(userId) } onsdag den 6. marts 13
def Observable getVideos(userId) { return VideoService.getVideos(userId) } Asynchronous request that returns Observable<Video> onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) } onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) } Reactive operator on the Observable that takes the first 10 Video objects then unsubscribes. onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .map({ Video video -> // transform video object }) } onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .map({ Video video -> // transform video object }) } The ‘map’ operator allows transforming the input value into a different output. onsdag den 6. marts 13
       Observable<R>  b  =  Observable<T>.map({  T  t  -­‐>              R  r  =  ...  transform  t  ...            return  r;        }) onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } We change to ‘mapMany’ which is like merge(map()) since we will return an Observable<T> instead of T. onsdag den 6. marts 13
 Observable<R>  b  =  Observable<T>.mapMany({  T  t  -­‐>          Observable<R>  r  =  ...  transform  t  ...        return  r;  }) onsdag den 6. marts 13
 Observable<R>  b  =  Observable<T>.mapMany({  T  t  -­‐>          Observable<R>  r  =  ...  transform  t  ...        return  r;  }) onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } Nested asynchronous calls that return more Observables. onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } Observable<VideoMetadata> Observable<VideoBookmark> Observable<VideoRating> onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } Each Observable transforms its data using ‘map’ onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... // compose these together }) } onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together }) } onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together return Observable.zip(m, b, r, { metadata, bookmark, rating -> // now transform to complete dictionary // of data we want for each Video return [id: video.videoId] << metadata << bookmark << rating }) }) } onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together return Observable.zip(m, b, r, { metadata, bookmark, rating -> // now transform to complete dictionary // of data we want for each Video return [id: video.videoId] << metadata << bookmark << rating }) }) } The ‘zip’ operator combines the 3 asynchronous Observables into 1 onsdag den 6. marts 13
       Observable.zip(a,  b,  {  a,  b,  -­‐>              ...  operate  on  values  from  both  a  &  b  ...            return  [a,  b];  //  i.e.  return  tuple        }) onsdag den 6. marts 13
def Observable<Map> getVideos(userId) { return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together return Observable.zip(m, b, r, { metadata, bookmark, rating -> // now transform to complete dictionary // of data we want for each Video return [id: video.videoId] << metadata << bookmark << rating }) }) } return a single Map (dictionary) of transformed and combined data from 4 asynchronous calls onsdag den 6. marts 13
onsdag den 6. marts 13
Observable<Video> emits n videos to onNext() onsdag den 6. marts 13
Takes first 10 then unsubscribes from origin. Returns Observable<Video> that emits 10 Videos. onsdag den 6. marts 13
For each of the 10 Video objects it transforms via ‘mapMany’ function that does nested async calls. onsdag den 6. marts 13
For each Video ‘v’ it calls getMetadata() which returns Observable<VideoMetadata> These nested async requests return Observables that emit 1 value. onsdag den 6. marts 13
The Observable<VideoMetadata> is transformed via a ‘map’ function to return a Map of key/values. onsdag den 6. marts 13
Same for Observable<VideoBookmark> and Observable<VideoRating> onsdag den 6. marts 13
The 3 ‘mapped’ Observables are combined with a ‘zip’ function that emits a Map with all data. onsdag den 6. marts 13
The full sequence emits Observable<Map> that emits a Map for each of 10 Videos. onsdag den 6. marts 13
Client code treats all interactions with the API as asynchronous The API implementation chooses whether something is blocking or non-blocking and what resources it uses. onsdag den 6. marts 13
Example of latency reduction achieved by increasing number of threads used by Observables. onsdag den 6. marts 13
+ Observable<User>  u  =  new  GetUserCommand(id).observe(); Observable<Geo>  g  =  new  GetGeoCommand(request).observe(); Observable.zip(u,  g,  {user,  geo  -­‐>                  return  [username:  user.getUsername(),                                  currentLocation:  geo.getCounty()]       }) RxJava coming to Hystrix https://github.com/Netflix/Hystrix onsdag den 6. marts 13
<dependency  org="com.netflix.rxjava"  name="rxjava-­‐core"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐groovy"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐clojure"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐scala"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐jruby"  rev="x.y.z"  /> <dependency>        <groupId>com.netflix.rxjava</groupId>        <artifactId>rxjava-­‐core</artifactId>        <version>x.y.z</version> </dependency> ... or for a different language ... To get started ... onsdag den 6. marts 13
Functional Reactive in the Netflix API with RxJava http://techblog.netflix.com/2013/02/rxjava-netflix-api.html Optimizing the Netflix API http://techblog.netflix.com/2013/01/optimizing-netflix-api.html RxJava https://github.com/Netflix/RxJava @RxJava Ben Christensen @benjchristensen http://www.linkedin.com/in/benjchristensen Netflix is Hiring http://jobs.netflix.com onsdag den 6. marts 13

Functional Reactive Programming in the Netflix API

  • 1.
    Functional Reactive Programming inthe Netflix API Ben Christensen Software Engineer – API Platform at Netflix @benjchristensen http://www.linkedin.com/in/benjchristensen http://techblog.netflix.com/ QCon London – March 6 2013 onsdag den 6. marts 13
  • 2.
    InfoQ.com: News &Community Site • 750,000 unique visitors/month • Published in 4 languages (English, Chinese, Japanese and Brazilian Portuguese) • Post content from our QCon conferences • News 15-20 / week • Articles 3-4 / week • Presentations (videos) 12-15 / week • Interviews 2-3 / week • Books 1 / month Watch the video with slide synchronization on InfoQ.com! http://www.infoq.com/presentations /netflix-functional-rx
  • 3.
    Presented at QConLondon www.qconlondon.com Purpose of QCon - to empower software development by facilitating the spread of knowledge and innovation Strategy - practitioner-driven conference designed for YOU: influencers of change and innovation in your teams - speakers and topics driving the evolution and innovation - connecting and catalyzing the influencers and innovators Highlights - attended by more than 12,000 delegates since 2007 - held in 9 cities worldwide
  • 4.
    onsdag den 6.marts 13
  • 5.
    More than 33million Subscribers in more than 50 Countries and Territories onsdag den 6. marts 13
  • 6.
    Netflix accounts for33% of Peak Downstream Internet Traffic in North America Netflix subscribers are watching more than 1 billion hours a month onsdag den 6. marts 13
  • 7.
    API traffic hasgrown from ~20 million/day in 2010 to >2 billion/day 0 500 1000 1500 2000 2010 2011 2012 Today millionsofAPIrequestsperday onsdag den 6. marts 13
  • 8.
  • 9.
  • 10.
    onsdag den 6.marts 13
  • 11.
    Open API NetflixDevices API Request Volume by Audience onsdag den 6. marts 13
  • 12.
    Netflix API Dependency A DependencyD Dependency G Dependency J Dependency M Dependency P Dependency B Dependency E Dependency H Dependency K Dependency N Dependency Q Dependency C Dependency F Dependency I Dependency L Dependency O Dependency R onsdag den 6. marts 13
  • 13.
    /ps3/home Dependency F 10 Threads DependencyG 10 Threads Dependency H 10 Threads Dependency I 5 Threads Dependency J 8 Threads Dependency A 10 Threads Dependency B 8 Threads Dependency C 10 Threads Dependency D 15 Threads Dependency E 5 Threads Dependency K 15 Threads Dependency L 4 Threads Dependency M 5 Threads Dependency N 10 Threads Dependency O 10 Threads Dependency P 10 Threads Dependency Q 8 Threads Dependency R 10 Threads Dependency S 8 Threads Dependency T 10 Threads /android/home /tv/home Functional Reactive Dynamic Endpoints Asynchronous Java API onsdag den 6. marts 13
  • 14.
    /ps3/home Dependency F 10 Threads DependencyG 10 Threads Dependency H 10 Threads Dependency I 5 Threads Dependency J 8 Threads Dependency A 10 Threads Dependency B 8 Threads Dependency C 10 Threads Dependency D 15 Threads Dependency E 5 Threads Dependency K 15 Threads Dependency L 4 Threads Dependency M 5 Threads Dependency N 10 Threads Dependency O 10 Threads Dependency P 10 Threads Dependency Q 8 Threads Dependency R 10 Threads Dependency S 8 Threads Dependency T 10 Threads /android/home /tv/home Functional Reactive Dynamic Endpoints Asynchronous Java API Hystrix fault-isolation layer onsdag den 6. marts 13
  • 15.
    /ps3/home Dependency F 10 Threads DependencyG 10 Threads Dependency H 10 Threads Dependency I 5 Threads Dependency J 8 Threads Dependency A 10 Threads Dependency B 8 Threads Dependency C 10 Threads Dependency D 15 Threads Dependency E 5 Threads Dependency K 15 Threads Dependency L 4 Threads Dependency M 5 Threads Dependency N 10 Threads Dependency O 10 Threads Dependency P 10 Threads Dependency Q 8 Threads Dependency R 10 Threads Dependency S 8 Threads Dependency T 10 Threads /android/home /tv/home Functional Reactive Dynamic Endpoints Asynchronous Java API onsdag den 6. marts 13
  • 16.
    RxJava “a library forcomposing asynchronous and event-based programs using observable sequences for the Java VM” A Java port of Rx (Reactive Extensions) https://rx.codeplex.com (.Net and Javascript by Microsoft) onsdag den 6. marts 13
  • 17.
    Do we reallyneed another way of “managing” concurrency? onsdag den 6. marts 13
  • 18.
    Discovery of Rxbegan with a re-architecture ... onsdag den 6. marts 13
  • 19.
    ... that collapsednetwork traffic into coarse API calls ... onsdag den 6. marts 13
  • 20.
    ... that collapsednetwork traffic into coarse API calls ... Nested, conditional, parallel execution onsdag den 6. marts 13
  • 21.
    ... and we wantedto allow anybody to create endpoints, not just the “API Team” onsdag den 6. marts 13
  • 22.
    onsdag den 6.marts 13
  • 23.
    Concurrency without each engineerreading and re-reading this -> (awesome book ... everybody isn’t going to - or should have to - read it though, that’s the point) onsdag den 6. marts 13
  • 24.
    Owner of APIshould retain control of concurrency behavior. onsdag den 6. marts 13
  • 25.
    public Data getData(); Whatif the implementation needs to change from synchronous to asynchronous? How should the client execute that method without blocking? spawn a thread? Owner of API should retain control of concurrency behavior. onsdag den 6. marts 13
  • 26.
    public void getData(Callback<T>c); public Future<T> getData(); public Future<List<Future<T>>> getData(); What about ... ? onsdag den 6. marts 13
  • 27.
  • 28.
    Iterable pull Observable push T next() throws Exception returns; onNext(T) onError(Exception) onCompleted()  //  Iterable<String>    //  that  contains  75  Strings  getDataFromLocalMemory()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .forEach(          {  println  "next  =>  "  +  it})  //  Observable<String>    //  that  emits  75  Strings  getDataFromNetwork()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .subscribe(          {  println  "onNext  =>  "  +  it}) onsdag den 6. marts 13
  • 29.
    Iterable pull Observable push T next() throws Exception returns; onNext(T) onError(Exception) onCompleted()  //  Iterable<String>    //  that  contains  75  Strings  getDataFromLocalMemory()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .forEach(          {  println  "onNext  =>  "  +  it})  //  Observable<String>    //  that  emits  75  Strings  getDataFromNetwork()    .skip(10)    .take(5)    .map({  s  -­‐>        return  s  +  "_transformed"})    .subscribe(          {  println  "onNext  =>  "  +  it}) onsdag den 6. marts 13
  • 30.
    Instead of blockingAPIs ... class  VideoService  {      def  VideoList  getPersonalizedListOfMovies(userId);      def  VideoBookmark  getBookmark(userId,  videoId);      def  VideoRating  getRating(userId,  videoId);      def  VideoMetadata  getMetadata(videoId); } class  VideoService  {      def  Observable<VideoList>  getPersonalizedListOfMovies(userId);      def  Observable<VideoBookmark>  getBookmark(userId,  videoId);      def  Observable<VideoRating>  getRating(userId,  videoId);      def  Observable<VideoMetadata>  getMetadata(videoId); } ... create Observable APIs: onsdag den 6. marts 13
  • 31.
    onsdag den 6.marts 13
  • 32.
    onsdag den 6.marts 13
  • 33.
    onsdag den 6.marts 13
  • 34.
    onsdag den 6.marts 13
  • 35.
    onsdag den 6.marts 13
  • 36.
    onsdag den 6.marts 13
  • 37.
    Observable.toObservable("one",  "two",  "three")          .take(2)          .subscribe((arg)  -­‐>  {                    System.out.println(arg);          }); Java8 Observable.toObservable("one",  "two",  "three")    .take(2)    .subscribe((arg:  String)  =>  {            println(arg)    }) Scala (-­‐>      (Observable/toObservable  ["one"  "two"  "three"])    (.take  2)      (.subscribe  (fn  [arg]  (println  arg)))) Clojure    Observable.toObservable("one",  "two",  "three")        .take(2)          .subscribe({arg  -­‐>  println(arg)}) Groovy    Observable.toObservable("one",  "two",  "three")        .take(2)          .subscribe(lambda  {  |arg|  puts  arg  }) JRuby onsdag den 6. marts 13
  • 38.
           Observable.create({  observer  -­‐>            try  {                  observer.onNext(new  Video(id))                observer.onCompleted();            }  catch(Exception  e)  {                observer.onError(e);            }        }) onsdag den 6. marts 13
  • 39.
           def  Observable<VideoRating>  getRating(userId,  videoId)  {                //  fetch  the  VideoRating  for  this  user  asynchronously                return  Observable.create({  observer  -­‐>                        executor.execute(new  Runnable()  {                                def  void  run()  {                                    try  {                                          VideoRating  rating  =  ...  do  network  call  ...                                        observer.onNext(rating)                                        observer.onCompleted();                                      }  catch(Exception  e)  {                                        observer.onError(e);                                      }                                      }                        })                })        } Asynchronous Observable with Single Value onsdag den 6. marts 13
  • 40.
           def  Observable<VideoRating>  getRating(userId,  videoId)  {                //  fetch  the  VideoRating  for  this  user  asynchronously                return  Observable.create({  observer  -­‐>                        executor.execute(new  Runnable()  {                                def  void  run()  {                                    try  {                                          VideoRating  rating  =  ...  do  network  call  ...                                        observer.onNext(rating)                                        observer.onCompleted();                                      }  catch(Exception  e)  {                                        observer.onError(e);                                      }                                      }                        })                })        } Asynchronous Observable with Single Value onsdag den 6. marts 13
  • 41.
    Synchronous Observable withMultiple Values        def  Observable<Video>  getVideos()  {                return  Observable.create({  observer  -­‐>                      try  {                              for(v  in  videos)  {                                observer.onNext(v)                          }                          observer.onCompleted();                      }  catch(Exception  e)  {                          observer.onError(e);                      }                })        } Caution: This is eager and will always emit all values regardless of subsequent operators such as take(10) onsdag den 6. marts 13
  • 42.
    Synchronous Observable withMultiple Values        def  Observable<Video>  getVideos()  {                return  Observable.create({  observer  -­‐>                      try  {                              for(v  in  videos)  {                                observer.onNext(v)                          }                          observer.onCompleted();                      }  catch(Exception  e)  {                          observer.onError(e);                      }                })        } Caution: This is eager and will always emit all values regardless of subsequent operators such as take(10) onsdag den 6. marts 13
  • 43.
    Asynchronous Observable withMultiple Values  def  Observable<Video>  getVideos()  {        return  Observable.create({  observer  -­‐>              executor.execute(new  Runnable()  {                    def  void  run()  {                        try  {                                for(id  in  videoIds)  {                                  Video  v  =  ...  do  network  call  ...                                  observer.onNext(v)                              }                              observer.onCompleted();                          }  catch(Exception  e)  {                              observer.onError(e);                          }                      }              })        })  } onsdag den 6. marts 13
  • 44.
    Asynchronous Observable withMultiple Values  def  Observable<Video>  getVideos()  {        return  Observable.create({  observer  -­‐>              executor.execute(new  Runnable()  {                    def  void  run()  {                        try  {                                for(id  in  videoIds)  {                                  Video  v  =  ...  do  network  call  ...                                  observer.onNext(v)                              }                              observer.onCompleted();                          }  catch(Exception  e)  {                              observer.onError(e);                          }                      }              })        })  } onsdag den 6. marts 13
  • 45.
    Observable<SomeData> a =getDataA(); Observable<SomeData> b = getDataB(); Observable<SomeData> c = getDataC(); Observable.merge(a, b, c) .subscribe( { element -> println("data: " + element)}, { exception -> println("error occurred: " + exception.getMessage())} ) Combining via Merge onsdag den 6. marts 13
  • 46.
    Observable<SomeData> a =getDataA(); Observable<String> b = getDataB(); Observable<MoreData> c = getDataC(); Observable.zip(a, b, c, {x, y, z -> [x, y, z]}) .subscribe( { triple -> println("a: " + triple[0] + " b: " + triple[1] + " c: " + triple[2])}, { exception -> println("error occurred: " + exception.getMessage())} ) Combining via Zip onsdag den 6. marts 13
  • 47.
    Observable<SomeData> a =getDataA(); Observable<String> b = getDataB(); Observable<MoreData> c = getDataC(); Observable.zip(a, b, c, {x, y, z -> [x, y, z]}) .subscribe( { triple -> println("a: " + triple[0] + " b: " + triple[1] + " c: " + triple[2])}, { exception -> println("error occurred: " + exception.getMessage())} ) Error Handling onsdag den 6. marts 13
  • 48.
    Observable<SomeData> a =getDataA(); Observable<String> b = getDataB(); Observable<MoreData> c = getDataC() .onErrorResumeNext(getFallbackForDataC()); Observable.zip(a, b, c, {x, y, z -> [x, y, z]}) .subscribe( { triple -> println("a: " + triple[0] + " b: " + triple[1] + " c: " + triple[2])}, { exception -> println("error occurred: " + exception.getMessage())} ) Error Handling onsdag den 6. marts 13
  • 49.
    def Observable getVideos(userId){ return VideoService.getVideos(userId) } onsdag den 6. marts 13
  • 50.
    def Observable getVideos(userId){ return VideoService.getVideos(userId) } Asynchronous request that returns Observable<Video> onsdag den 6. marts 13
  • 51.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) } onsdag den 6. marts 13
  • 52.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) } Reactive operator on the Observable that takes the first 10 Video objects then unsubscribes. onsdag den 6. marts 13
  • 53.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .map({ Video video -> // transform video object }) } onsdag den 6. marts 13
  • 54.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .map({ Video video -> // transform video object }) } The ‘map’ operator allows transforming the input value into a different output. onsdag den 6. marts 13
  • 55.
           Observable<R>  b  =  Observable<T>.map({  T  t  -­‐>              R  r  =  ...  transform  t  ...            return  r;        }) onsdag den 6. marts 13
  • 56.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } onsdag den 6. marts 13
  • 57.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } We change to ‘mapMany’ which is like merge(map()) since we will return an Observable<T> instead of T. onsdag den 6. marts 13
  • 58.
     Observable<R>  b  =  Observable<T>.mapMany({  T  t  -­‐>          Observable<R>  r  =  ...  transform  t  ...        return  r;  }) onsdag den 6. marts 13
  • 59.
     Observable<R>  b  =  Observable<T>.mapMany({  T  t  -­‐>          Observable<R>  r  =  ...  transform  t  ...        return  r;  }) onsdag den 6. marts 13
  • 60.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } Nested asynchronous calls that return more Observables. onsdag den 6. marts 13
  • 61.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } Observable<VideoMetadata> Observable<VideoBookmark> Observable<VideoRating> onsdag den 6. marts 13
  • 62.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... }) } Each Observable transforms its data using ‘map’ onsdag den 6. marts 13
  • 63.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> // for each video we want to fetch metadata def m = video.getMetadata() .map({ Map<String, String> md -> // transform to the data and format we want return [title: md.get("title"), length: md.get("duration")] }) // and its rating and bookmark def b ... def r ... // compose these together }) } onsdag den 6. marts 13
  • 64.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together }) } onsdag den 6. marts 13
  • 65.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together return Observable.zip(m, b, r, { metadata, bookmark, rating -> // now transform to complete dictionary // of data we want for each Video return [id: video.videoId] << metadata << bookmark << rating }) }) } onsdag den 6. marts 13
  • 66.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together return Observable.zip(m, b, r, { metadata, bookmark, rating -> // now transform to complete dictionary // of data we want for each Video return [id: video.videoId] << metadata << bookmark << rating }) }) } The ‘zip’ operator combines the 3 asynchronous Observables into 1 onsdag den 6. marts 13
  • 67.
           Observable.zip(a,  b,  {  a,  b,  -­‐>              ...  operate  on  values  from  both  a  &  b  ...            return  [a,  b];  //  i.e.  return  tuple        }) onsdag den 6. marts 13
  • 68.
    def Observable<Map> getVideos(userId){ return VideoService.getVideos(userId) // we only want the first 10 of each list .take(10) .mapMany({ Video video -> def m ... def b ... def r ... // compose these together return Observable.zip(m, b, r, { metadata, bookmark, rating -> // now transform to complete dictionary // of data we want for each Video return [id: video.videoId] << metadata << bookmark << rating }) }) } return a single Map (dictionary) of transformed and combined data from 4 asynchronous calls onsdag den 6. marts 13
  • 69.
    onsdag den 6.marts 13
  • 70.
    Observable<Video> emits nvideos to onNext() onsdag den 6. marts 13
  • 71.
    Takes first 10then unsubscribes from origin. Returns Observable<Video> that emits 10 Videos. onsdag den 6. marts 13
  • 72.
    For each ofthe 10 Video objects it transforms via ‘mapMany’ function that does nested async calls. onsdag den 6. marts 13
  • 73.
    For each Video‘v’ it calls getMetadata() which returns Observable<VideoMetadata> These nested async requests return Observables that emit 1 value. onsdag den 6. marts 13
  • 74.
    The Observable<VideoMetadata> istransformed via a ‘map’ function to return a Map of key/values. onsdag den 6. marts 13
  • 75.
    Same for Observable<VideoBookmark>and Observable<VideoRating> onsdag den 6. marts 13
  • 76.
    The 3 ‘mapped’Observables are combined with a ‘zip’ function that emits a Map with all data. onsdag den 6. marts 13
  • 77.
    The full sequenceemits Observable<Map> that emits a Map for each of 10 Videos. onsdag den 6. marts 13
  • 78.
    Client code treatsall interactions with the API as asynchronous The API implementation chooses whether something is blocking or non-blocking and what resources it uses. onsdag den 6. marts 13
  • 79.
    Example of latencyreduction achieved by increasing number of threads used by Observables. onsdag den 6. marts 13
  • 80.
    + Observable<User>  u  =  new  GetUserCommand(id).observe(); Observable<Geo>  g  =  new  GetGeoCommand(request).observe(); Observable.zip(u,  g,  {user,  geo  -­‐>                  return  [username:  user.getUsername(),                                  currentLocation:  geo.getCounty()]       }) RxJava coming to Hystrix https://github.com/Netflix/Hystrix onsdag den 6. marts 13
  • 81.
    <dependency  org="com.netflix.rxjava"  name="rxjava-­‐core"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐groovy"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐clojure"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐scala"  rev="x.y.z"  /> <dependency  org="com.netflix.rxjava"  name="rxjava-­‐jruby"  rev="x.y.z"  /> <dependency>        <groupId>com.netflix.rxjava</groupId>        <artifactId>rxjava-­‐core</artifactId>        <version>x.y.z</version> </dependency> ... or for a different language ... To get started ... onsdag den 6. marts 13
  • 82.
    Functional Reactive inthe Netflix API with RxJava http://techblog.netflix.com/2013/02/rxjava-netflix-api.html Optimizing the Netflix API http://techblog.netflix.com/2013/01/optimizing-netflix-api.html RxJava https://github.com/Netflix/RxJava @RxJava Ben Christensen @benjchristensen http://www.linkedin.com/in/benjchristensen Netflix is Hiring http://jobs.netflix.com onsdag den 6. marts 13