REACTIVE FAULT TOLERANT PROGRAMMING with Hystrix and RxJava Matt Stine (@mstine)
Matt StineSenior Product Manager - Pivotal Software, Inc. Author of: http://bit.ly/cloud-native-book
© Copyright 2015 Pivotal. All rights reserved. 3 MICROSERVICES!!!!!
Typical Microservices Architecture ÂAPI ÂAPI ! ! ! µS µS µS µS µS
Trace a Request ÂAPI ÂAPI ! ! ! µS µS µS µS µS
Consider this subgraph… ÂAPI µS µS
public ResponseEntity<Foo> handleIncomingRequest() { Foo foo = new Foo(); foo.addPart(serviceOne.getContributionToFoo()); foo.addPart(serviceTwo.getContributionToFoo()); return new ResponseEntity<>(foo, HttpStatus.OK); } Block and wait!
Meanwhile in Service Two… ! ! µS µS µS
public ResponseEntity<FooPart> handleIncomingRequest() { FooPart fooPart = new FooPart(); fooPart.addSubPart(serviceThree.getContributionToFooPart()); fooPart.addSubPart(serviceFour.getContributionToFooPart()); return new ResponseEntity<>(fooPart, HttpStatus.OK); } Block and wait!
BUT DAT LATENCY THO
Futures! ExecutorService executor = createExecutorService(); public ResponseEntity<Foo> handleIncomingRequest() { Foo foo = new Foo(); Future<FooPart> partOne = executor.submit(new CallToServiceOne()); Future<FooPart> partTwo = executor.submit(new CallToServiceTwo()); foo.addPart(partOne.get()); foo.addPart(partTwo.get()); return new ResponseEntity<>(foo, HttpStatus.OK); }
The service graph changes… public ResponseEntity<Foo> handleIncomingRequest() { Foo foo = new Foo(); Future<FooPart> partOne = executor.submit(new CallToServiceOne()); Future<FooPart> partTwo = executor.submit( new CallToServiceTwo(partOne.get())); Future<FooPart> partThree = executor.submit(new CallToServiceThree()) foo.addPart(partTwo.get()); foo.addPart(partThree.get()); return new ResponseEntity<>(foo, HttpStatus.OK); } Block and wait! Blocked until two completes!
CompletableFutures public ResponseEntity<Foo> handleIncomingRequest() { Foo foo = new Foo(); CompletableFuture<FooPart> partTwo = CompletableFuture.supplyAsync( new CallToServiceOne()) .thenApplyAsync(new CallToServiceTwo()); CompletableFuture<FooPart> partThree = CompletableFuture.supplyAsync( new CallToServiceThree()); foo.addPart(partTwo.get()); foo.addPart(partThree.get()); return new ResponseEntity<>(foo, HttpStatus.OK); }
As composition becomes more complex, CompletableFuture API is lacking…
An API like this would be nice… List<Integer> transactionsIds = transactions.stream() .filter(t -> t.getType() == Transaction.GROCERY) .sorted(comparing(Transaction::getValue).reversed()) .map(Transaction::getId) .collect(toList());
Hold that thought…
RESPONSIVE
RESILIENT
ELASTIC
MESSAGE-DRIVEN
RxJava http://reactivex.io https://github.com/ReactiveX/RxJava
Observer Pattern
Hello Observable Observable<Integer> observableString = Observable.create( new Observable.OnSubscribe<Integer>() { public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }); Emit an item! I’m done emitting items!
Observable<Integer> observableString = Observable.create( subscriber -> { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); }); Hello Observable (JDK 8)
Hello Subscription Subscription subscription = observableString.subscribe( new Observer<Integer>() { public void onCompleted() { System.out.println("Observable completed"); } public void onError(Throwable throwable) { System.out.println("Oh noes! Something wrong happened!"); } public void onNext(Integer integer) { System.out.println("Item is " + integer); } });
Hello Subscription (JDK 8) Subscription subscription = observableString.subscribe( item -> System.out.println("Lambda says: " + item), throwable -> System.out.println("Lambda says: Oh noes! Something wrong happened!"), () -> System.out.println("Lambda says: Observable completed"));
All together now… Observable.create( subscriber -> { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); }).subscribe( item -> System.out.println("Lambda says: " + item), throwable -> System.out.println("Lambda says: Oh noes! Something wrong happened!"), () -> System.out.println("Lambda says: Observable completed"));
Back to Our Service Graph ÂAPI ÂAPI ! ! ! µS µS µS µS µS
With Callables Observable.fromCallable(new CallToServiceOne()) .flatMap(serviceOneResult -> Observable.fromCallable( new CallToServiceTwo(serviceOneResult))) .zipWith(Observable.fromCallable( new CallToServiceThree()), (FooPart resultFromServiceTwo, FooPart resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
With Lambdas Observable.<FooPart>create(serviceOneSubscriber -> { serviceOneSubscriber.onNext(new FooPart("one")); serviceOneSubscriber.onCompleted(); }).flatMap(serviceOneResult -> Observable.<FooPart>create(serviceTwoSubscriber -> { serviceTwoSubscriber.onNext(new FooPart(serviceOneResult + " + two")); serviceTwoSubscriber.onCompleted(); })).zipWith(Observable.<FooPart>create(serviceThreeSubscriber -> { serviceThreeSubscriber.onNext(new FooPart("three")); serviceThreeSubscriber.onCompleted(); }), (resultFromServiceTwo, resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
RxJava Operator Tour
Combining Observables • merge() - combine multiple Observables so they act like a single Observable • zip() - combine sets of items from multiple Observables via a Function, and emit the results https://github.com/ReactiveX/RxJava/wiki/Combining-Observables
Conditionals • doWhile() - emit Observable sequence, then repeat the sequence as long as the condition remains true • ifThen() - only emit Observable sequence if a condition is true, otherwise emit empty or default sequence • skipUntil() - discard emitted items from Observable until a second Observable emits an item, then emit the remainder • takeUntil() - emit items from Observable until a second Observable emits or notifies https://github.com/ReactiveX/RxJava/wiki/Conditional-and-Boolean-Operators
Boolean Operators • all() - do all the emitted items meet some criteria? • contains() - does the Observable emit a particular item? • exists() / isEmpty() - does an Observable emit items or not? • sequenceEqual() - do two Observables emit equal sequences? https://github.com/ReactiveX/RxJava/wiki/Conditional-and-Boolean-Operators
Filtering • filter() - filter Observable with a predicate • take(n) - emit only the first n items • skip(n) - ignore the first n items emitted • sample() - sample items according to a periodic interval https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables
Transforming • map() - apply a function to each emitted item • flatMap() - transform items emitted by Observable into Observables, then flatten • scan() - apply a function to each emitted item, then feedback result and repeat • buffer() - gather emitted items into bundles and emit the bundles https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables
Backpressure!
– https://github.com/ReactiveX/RxJava/wiki/Backpressure “…a quickly-producing Observable meets a slow-consuming observer.”
Backpressure Observable Observable.create(subscriber -> IntStream.iterate(0, i -> i + 2) .forEach(value -> subscriber.onNext(value))) .onBackpressureBuffer() .subscribe(new BackpressureSubscriber());
Backpressure Subscriber public class BackpressureSubscriber extends Subscriber<Object> { private int counter = 0; @Override public void onStart() { request(10); } @Override public void onNext(Object o) { if (counter < 9) { processItem(o); } else { processItem(o); resetCounter(); request(10); } } Please only give me this many! OK, I can handle more now.
Backpressure Subscriber private void processItem(Object o) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } counter++; System.out.println(counter + " : " + o); } private void resetCounter() { counter = 0; System.out.println("FETCH MORE"); } Simulate Processing Latency }
WHAT ABOUT FAILURES?
What happens if we cut here? ÂAPI ÂAPI ! ! ! µS µS µS µS µS
Or here? ÂAPI ÂAPI ! ! ! µS µS µS µS µS
Or here? ÂAPI ÂAPI ! ! ! µS µS µS µS µS
500 INTERNAL SERVER ERROR
Fault Tolerance at Netflix http://techblog.netflix.com/2012/02/fault-tolerance-in-high-volume.html
Without taking steps to ensure fault tolerance, 30 dependencies each with 99.99% uptime would result in 2+ hours downtime/ month (99.99%30 = 99.7% uptime = 2.16 hours/month). http://techblog.netflix.com/2012/02/fault-tolerance-in-high-volume.html
The Circuit Breaker Pattern
Circuit Breaker State Machine Closed on call / pass through call succeeds / reset count call fails / count failure threshold reached / trip breaker Open on call / fail on timeout / attempt reset Half-Open on call / pass through call succeeds / reset call fails / trip breaker trip breaker trip breaker attempt reset reset
https://github.com/Netflix/Hystrix
Hello Hystrix World public class CommandHelloWorld extends HystrixCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // Do something that might fail... return "Hello " + name + "!"; } }
Synchronous String s = new CommandHelloWorld("Bob").execute(); Block and wait!
Asynchronous Future<String> s = new CommandHelloWorld("Bob").queue();
Reactive! Observable<String> s = new CommandHelloWorld("Bob").observe(); s.subscribe(val -> { // value emitted here });
Fail Fast public class CommandThatFailsFast extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } } }
Fail Silently public class CommandThatFailsSilently extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsSilently"); } else { return "success"; } } @Override protected String getFallback() { return null; } } Fallback behavior! }
Static Fallback @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandWithStaticFallback"); } else { return "success"; } } @Override protected String getFallback() { return "fallback"; } Fallback behavior! }
Stubbed Fallback @Override protected UserAccount run() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException("forcing failure for example"); } @Override protected UserAccount getFallback() { /** * Return stubbed fallback with some static defaults, placeholders, * and an injected value 'countryCodeFromGeoLookup' that we'll use * instead of what we would have retrieved from the remote service. */ return new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false); } Fallback behavior! }
Fallback: Cache via Network
Primary + Secondary with Fallback
How Hystrix Works!
Hystrix Dashboard
A Failing Circuit
RxJava + Hystrix
With Callables Observable.fromCallable(new CallToServiceOne()) .flatMap(serviceOneResult -> Observable.fromCallable( new CallToServiceTwo(serviceOneResult))) .zipWith(Observable.fromCallable( new CallToServiceThree()), (FooPart resultFromServiceTwo, FooPart resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
With HystrixObservableCommands new CallToServiceOneCommand("callToServiceOne").observe() .flatMap(serviceOneResult -> new CallToServiceTwoCommand("callToServiceTwo", serviceOneResult).observe()) .zipWith(new CallToServiceThreeCommand("callToServiceThree").observe(), (FooPart resultFromServiceTwo, FooPart resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
Call to Service One public class CallToServiceOneCommand extends HystrixObservableCommand<FooPart> { public CallToServiceOneCommand(String name) { super(HystrixCommandGroupKey.Factory.asKey(name)); } @Override protected Observable<FooPart> construct() { return Observable.create(new Observable.OnSubscribe<FooPart>() { @Override public void call(Subscriber<? super FooPart> subscriber) { subscriber.onNext(new FooPart("one")); subscriber.onCompleted(); } }); } }
public class CallToServiceTwoCommand extends HystrixObservableCommand<FooPart> { private final FooPart dependencyFromServiceOne; private CallToServiceTwoCommand(String name, FooPart dependencyFromServiceOne) { super(HystrixCommandGroupKey.Factory.asKey(name)); this.dependencyFromServiceOne = dependencyFromServiceOne; } @Override protected Observable<FooPart> construct() { return Observable.create(new Observable.OnSubscribe<FooPart>() { @Override public void call(Subscriber<? super FooPart> subscriber) { subscriber.onNext(new FooPart(dependencyFromServiceOne + " + two")); subscriber.onCompleted(); } }); } } Call to Service Two
Spring Cloud http://cloud.spring.io
Hystrix with Spring Cloud @SpringBootApplication @EnableDiscoveryClient @EnableCircuitBreaker public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
Hystrix with Spring Cloud @Service @EnableConfigurationProperties(FortuneProperties.class) public class FortuneService { @Autowired FortuneProperties fortuneProperties; @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "fallbackFortune") public Fortune randomFortune() { return restTemplate.getForObject("http://fortunes/random", Fortune.class); } private Fortune fallbackFortune() { return new Fortune(42L, fortuneProperties.getFallbackFortune()); } } }
RxJava + Hystrix + Spring Cloud @HystrixCommand(fallbackMethod = "stubMovie") public Observable<Movie> getMovie(final String mlId) { return new ObservableResult<Movie>() { @Override public Movie invoke() { return restTemplate.getForObject( "http://springbox-catalog/movies/{mlId}", Movie.class, mlId); } }; } private Movie stubMovie(final String mlId) { Movie stub = new Movie(); stub.setMlId(mlId); stub.setTitle("Interesting...the wrong title. Sssshhhh!"); return stub; }
RxJava + Hystrix + Spring Cloud @RequestMapping("/movie/{mlId}") public Observable<MovieDetails> movieDetails(@PathVariable String mlId) { return Observable.zip( catalogIntegrationService.getMovie(mlId), reviewsIntegrationService.reviewsFor(mlId), recommendationsIntegrationService.getRecommendations(mlId), (movie, reviews, recommendations) -> { MovieDetails movieDetails = new MovieDetails(); movieDetails.setMlId(movie.getMlId()); movieDetails.setTitle(movie.getTitle()); movieDetails.setReviews(reviews); movieDetails.setRecommendations(recommendations); return movieDetails; } ); }
Reactive Landscape
Reactive Streams http://www.reactive-streams.org/
Project Reactor https://projectreactor.io/
JDK 9 Flow API http://download.java.net/jdk9/docs/api/java/util/concurrent/Flow.html
REACTIVE FAULT TOLERANT PROGRAMMING with Hystrix and RxJava Get your FREE eBook! http://bit.ly/cloud-native-book Matt StineSenior Product Manager - Pivotal Software, Inc.
 @mstine
 matt.stine@gmail.com
 http://mattstine.com

Reactive Fault Tolerant Programming with Hystrix and RxJava

  • 1.
    REACTIVE FAULT TOLERANTPROGRAMMING with Hystrix and RxJava Matt Stine (@mstine)
  • 2.
    Matt StineSenior ProductManager - Pivotal Software, Inc. Author of: http://bit.ly/cloud-native-book
  • 3.
    © Copyright 2015Pivotal. All rights reserved. 3 MICROSERVICES!!!!!
  • 4.
  • 5.
  • 6.
  • 7.
    public ResponseEntity<Foo> handleIncomingRequest(){ Foo foo = new Foo(); foo.addPart(serviceOne.getContributionToFoo()); foo.addPart(serviceTwo.getContributionToFoo()); return new ResponseEntity<>(foo, HttpStatus.OK); } Block and wait!
  • 8.
    Meanwhile in ServiceTwo… ! ! µS µS µS
  • 9.
    public ResponseEntity<FooPart> handleIncomingRequest(){ FooPart fooPart = new FooPart(); fooPart.addSubPart(serviceThree.getContributionToFooPart()); fooPart.addSubPart(serviceFour.getContributionToFooPart()); return new ResponseEntity<>(fooPart, HttpStatus.OK); } Block and wait!
  • 10.
  • 11.
    Futures! ExecutorService executor =createExecutorService(); public ResponseEntity<Foo> handleIncomingRequest() { Foo foo = new Foo(); Future<FooPart> partOne = executor.submit(new CallToServiceOne()); Future<FooPart> partTwo = executor.submit(new CallToServiceTwo()); foo.addPart(partOne.get()); foo.addPart(partTwo.get()); return new ResponseEntity<>(foo, HttpStatus.OK); }
  • 12.
    The service graphchanges… public ResponseEntity<Foo> handleIncomingRequest() { Foo foo = new Foo(); Future<FooPart> partOne = executor.submit(new CallToServiceOne()); Future<FooPart> partTwo = executor.submit( new CallToServiceTwo(partOne.get())); Future<FooPart> partThree = executor.submit(new CallToServiceThree()) foo.addPart(partTwo.get()); foo.addPart(partThree.get()); return new ResponseEntity<>(foo, HttpStatus.OK); } Block and wait! Blocked until two completes!
  • 13.
    CompletableFutures public ResponseEntity<Foo> handleIncomingRequest(){ Foo foo = new Foo(); CompletableFuture<FooPart> partTwo = CompletableFuture.supplyAsync( new CallToServiceOne()) .thenApplyAsync(new CallToServiceTwo()); CompletableFuture<FooPart> partThree = CompletableFuture.supplyAsync( new CallToServiceThree()); foo.addPart(partTwo.get()); foo.addPart(partThree.get()); return new ResponseEntity<>(foo, HttpStatus.OK); }
  • 14.
    As composition becomesmore complex, CompletableFuture API is lacking…
  • 15.
    An API likethis would be nice… List<Integer> transactionsIds = transactions.stream() .filter(t -> t.getType() == Transaction.GROCERY) .sorted(comparing(Transaction::getValue).reversed()) .map(Transaction::getId) .collect(toList());
  • 16.
  • 18.
  • 19.
  • 20.
  • 21.
  • 23.
  • 24.
  • 25.
    Hello Observable Observable<Integer> observableString= Observable.create( new Observable.OnSubscribe<Integer>() { public void call(Subscriber<? super Integer> subscriber) { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); } }); Emit an item! I’m done emitting items!
  • 26.
    Observable<Integer> observableString =Observable.create( subscriber -> { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); }); Hello Observable (JDK 8)
  • 27.
    Hello Subscription Subscription subscription= observableString.subscribe( new Observer<Integer>() { public void onCompleted() { System.out.println("Observable completed"); } public void onError(Throwable throwable) { System.out.println("Oh noes! Something wrong happened!"); } public void onNext(Integer integer) { System.out.println("Item is " + integer); } });
  • 28.
    Hello Subscription (JDK8) Subscription subscription = observableString.subscribe( item -> System.out.println("Lambda says: " + item), throwable -> System.out.println("Lambda says: Oh noes! Something wrong happened!"), () -> System.out.println("Lambda says: Observable completed"));
  • 29.
    All together now… Observable.create( subscriber-> { for (int i = 0; i < 5; i++) { subscriber.onNext(i); } subscriber.onCompleted(); }).subscribe( item -> System.out.println("Lambda says: " + item), throwable -> System.out.println("Lambda says: Oh noes! Something wrong happened!"), () -> System.out.println("Lambda says: Observable completed"));
  • 30.
    Back to OurService Graph ÂAPI ÂAPI ! ! ! µS µS µS µS µS
  • 31.
    With Callables Observable.fromCallable(new CallToServiceOne()) .flatMap(serviceOneResult-> Observable.fromCallable( new CallToServiceTwo(serviceOneResult))) .zipWith(Observable.fromCallable( new CallToServiceThree()), (FooPart resultFromServiceTwo, FooPart resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
  • 32.
    With Lambdas Observable.<FooPart>create(serviceOneSubscriber ->{ serviceOneSubscriber.onNext(new FooPart("one")); serviceOneSubscriber.onCompleted(); }).flatMap(serviceOneResult -> Observable.<FooPart>create(serviceTwoSubscriber -> { serviceTwoSubscriber.onNext(new FooPart(serviceOneResult + " + two")); serviceTwoSubscriber.onCompleted(); })).zipWith(Observable.<FooPart>create(serviceThreeSubscriber -> { serviceThreeSubscriber.onNext(new FooPart("three")); serviceThreeSubscriber.onCompleted(); }), (resultFromServiceTwo, resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
  • 33.
  • 34.
    Combining Observables • merge()- combine multiple Observables so they act like a single Observable • zip() - combine sets of items from multiple Observables via a Function, and emit the results https://github.com/ReactiveX/RxJava/wiki/Combining-Observables
  • 35.
    Conditionals • doWhile() -emit Observable sequence, then repeat the sequence as long as the condition remains true • ifThen() - only emit Observable sequence if a condition is true, otherwise emit empty or default sequence • skipUntil() - discard emitted items from Observable until a second Observable emits an item, then emit the remainder • takeUntil() - emit items from Observable until a second Observable emits or notifies https://github.com/ReactiveX/RxJava/wiki/Conditional-and-Boolean-Operators
  • 36.
    Boolean Operators • all()- do all the emitted items meet some criteria? • contains() - does the Observable emit a particular item? • exists() / isEmpty() - does an Observable emit items or not? • sequenceEqual() - do two Observables emit equal sequences? https://github.com/ReactiveX/RxJava/wiki/Conditional-and-Boolean-Operators
  • 37.
    Filtering • filter() -filter Observable with a predicate • take(n) - emit only the first n items • skip(n) - ignore the first n items emitted • sample() - sample items according to a periodic interval https://github.com/ReactiveX/RxJava/wiki/Filtering-Observables
  • 38.
    Transforming • map() -apply a function to each emitted item • flatMap() - transform items emitted by Observable into Observables, then flatten • scan() - apply a function to each emitted item, then feedback result and repeat • buffer() - gather emitted items into bundles and emit the bundles https://github.com/ReactiveX/RxJava/wiki/Transforming-Observables
  • 39.
  • 40.
  • 41.
    Backpressure Observable Observable.create(subscriber ->IntStream.iterate(0, i -> i + 2) .forEach(value -> subscriber.onNext(value))) .onBackpressureBuffer() .subscribe(new BackpressureSubscriber());
  • 42.
    Backpressure Subscriber public classBackpressureSubscriber extends Subscriber<Object> { private int counter = 0; @Override public void onStart() { request(10); } @Override public void onNext(Object o) { if (counter < 9) { processItem(o); } else { processItem(o); resetCounter(); request(10); } } Please only give me this many! OK, I can handle more now.
  • 43.
    Backpressure Subscriber private voidprocessItem(Object o) { try { Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } counter++; System.out.println(counter + " : " + o); } private void resetCounter() { counter = 0; System.out.println("FETCH MORE"); } Simulate Processing Latency }
  • 44.
  • 45.
    What happens ifwe cut here? ÂAPI ÂAPI ! ! ! µS µS µS µS µS
  • 46.
  • 47.
  • 48.
  • 49.
    Fault Tolerance atNetflix http://techblog.netflix.com/2012/02/fault-tolerance-in-high-volume.html
  • 50.
    Without taking stepsto ensure fault tolerance, 30 dependencies each with 99.99% uptime would result in 2+ hours downtime/ month (99.99%30 = 99.7% uptime = 2.16 hours/month). http://techblog.netflix.com/2012/02/fault-tolerance-in-high-volume.html
  • 51.
  • 52.
    Circuit Breaker StateMachine Closed on call / pass through call succeeds / reset count call fails / count failure threshold reached / trip breaker Open on call / fail on timeout / attempt reset Half-Open on call / pass through call succeeds / reset call fails / trip breaker trip breaker trip breaker attempt reset reset
  • 53.
  • 54.
    Hello Hystrix World publicclass CommandHelloWorld extends HystrixCommand<String> { private final String name; public CommandHelloWorld(String name) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.name = name; } @Override protected String run() { // Do something that might fail... return "Hello " + name + "!"; } }
  • 55.
    Synchronous String s =new CommandHelloWorld("Bob").execute(); Block and wait!
  • 56.
    Asynchronous Future<String> s =new CommandHelloWorld("Bob").queue();
  • 57.
    Reactive! Observable<String> s =new CommandHelloWorld("Bob").observe(); s.subscribe(val -> { // value emitted here });
  • 58.
    Fail Fast public classCommandThatFailsFast extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsFast(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsFast"); } else { return "success"; } } }
  • 59.
    Fail Silently public classCommandThatFailsSilently extends HystrixCommand<String> { private final boolean throwException; public CommandThatFailsSilently(boolean throwException) { super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); this.throwException = throwException; } @Override protected String run() { if (throwException) { throw new RuntimeException("failure from CommandThatFailsSilently"); } else { return "success"; } } @Override protected String getFallback() { return null; } } Fallback behavior! }
  • 60.
    Static Fallback @Override protected Stringrun() { if (throwException) { throw new RuntimeException("failure from CommandWithStaticFallback"); } else { return "success"; } } @Override protected String getFallback() { return "fallback"; } Fallback behavior! }
  • 61.
    Stubbed Fallback @Override protected UserAccountrun() { // fetch UserAccount from remote service // return UserAccountClient.getAccount(customerId); throw new RuntimeException("forcing failure for example"); } @Override protected UserAccount getFallback() { /** * Return stubbed fallback with some static defaults, placeholders, * and an injected value 'countryCodeFromGeoLookup' that we'll use * instead of what we would have retrieved from the remote service. */ return new UserAccount(customerId, "Unknown Name", countryCodeFromGeoLookup, true, true, false); } Fallback behavior! }
  • 62.
  • 63.
    Primary + Secondarywith Fallback
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
    With Callables Observable.fromCallable(new CallToServiceOne()) .flatMap(serviceOneResult-> Observable.fromCallable( new CallToServiceTwo(serviceOneResult))) .zipWith(Observable.fromCallable( new CallToServiceThree()), (FooPart resultFromServiceTwo, FooPart resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
  • 69.
    With HystrixObservableCommands new CallToServiceOneCommand("callToServiceOne").observe() .flatMap(serviceOneResult-> new CallToServiceTwoCommand("callToServiceTwo", serviceOneResult).observe()) .zipWith(new CallToServiceThreeCommand("callToServiceThree").observe(), (FooPart resultFromServiceTwo, FooPart resultFromServiceThree) -> { Foo foo = new Foo(); foo.addPart(resultFromServiceTwo); foo.addPart(resultFromServiceThree); return foo; }).subscribe(System.out::println);
  • 70.
    Call to ServiceOne public class CallToServiceOneCommand extends HystrixObservableCommand<FooPart> { public CallToServiceOneCommand(String name) { super(HystrixCommandGroupKey.Factory.asKey(name)); } @Override protected Observable<FooPart> construct() { return Observable.create(new Observable.OnSubscribe<FooPart>() { @Override public void call(Subscriber<? super FooPart> subscriber) { subscriber.onNext(new FooPart("one")); subscriber.onCompleted(); } }); } }
  • 71.
    public class CallToServiceTwoCommandextends HystrixObservableCommand<FooPart> { private final FooPart dependencyFromServiceOne; private CallToServiceTwoCommand(String name, FooPart dependencyFromServiceOne) { super(HystrixCommandGroupKey.Factory.asKey(name)); this.dependencyFromServiceOne = dependencyFromServiceOne; } @Override protected Observable<FooPart> construct() { return Observable.create(new Observable.OnSubscribe<FooPart>() { @Override public void call(Subscriber<? super FooPart> subscriber) { subscriber.onNext(new FooPart(dependencyFromServiceOne + " + two")); subscriber.onCompleted(); } }); } } Call to Service Two
  • 72.
  • 73.
    Hystrix with SpringCloud @SpringBootApplication @EnableDiscoveryClient @EnableCircuitBreaker public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); } }
  • 74.
    Hystrix with SpringCloud @Service @EnableConfigurationProperties(FortuneProperties.class) public class FortuneService { @Autowired FortuneProperties fortuneProperties; @Autowired RestTemplate restTemplate; @HystrixCommand(fallbackMethod = "fallbackFortune") public Fortune randomFortune() { return restTemplate.getForObject("http://fortunes/random", Fortune.class); } private Fortune fallbackFortune() { return new Fortune(42L, fortuneProperties.getFallbackFortune()); } } }
  • 75.
    RxJava + Hystrix+ Spring Cloud @HystrixCommand(fallbackMethod = "stubMovie") public Observable<Movie> getMovie(final String mlId) { return new ObservableResult<Movie>() { @Override public Movie invoke() { return restTemplate.getForObject( "http://springbox-catalog/movies/{mlId}", Movie.class, mlId); } }; } private Movie stubMovie(final String mlId) { Movie stub = new Movie(); stub.setMlId(mlId); stub.setTitle("Interesting...the wrong title. Sssshhhh!"); return stub; }
  • 76.
    RxJava + Hystrix+ Spring Cloud @RequestMapping("/movie/{mlId}") public Observable<MovieDetails> movieDetails(@PathVariable String mlId) { return Observable.zip( catalogIntegrationService.getMovie(mlId), reviewsIntegrationService.reviewsFor(mlId), recommendationsIntegrationService.getRecommendations(mlId), (movie, reviews, recommendations) -> { MovieDetails movieDetails = new MovieDetails(); movieDetails.setMlId(movie.getMlId()); movieDetails.setTitle(movie.getTitle()); movieDetails.setReviews(reviews); movieDetails.setRecommendations(recommendations); return movieDetails; } ); }
  • 77.
  • 78.
  • 79.
  • 80.
    JDK 9 FlowAPI http://download.java.net/jdk9/docs/api/java/util/concurrent/Flow.html
  • 81.
    REACTIVE FAULT TOLERANTPROGRAMMING with Hystrix and RxJava Get your FREE eBook! http://bit.ly/cloud-native-book Matt StineSenior Product Manager - Pivotal Software, Inc.
 @mstine
 matt.stine@gmail.com
 http://mattstine.com