This document discusses Akka streams and Reactive Streams. It provides an example of creating an Akka streams application that exposes an HTTP service serving an infinite stream of numbers. The example demonstrates how Akka streams provides asynchronous stream processing with back pressure to prevent out of memory errors.
Actors – simple& high performance concurrency Cluster / Remoting, Cluster tools – location transparency, resilience – and prepackaged tools for building distributed systems Streams – back-pressured stream processing Persistence – CQRS + Event Sourcing for Actors HTTP – fully async streaming HTTP Server Complete Java & Scala APIs for all features What’s in the toolkit?
Reactive Streams timeline Oct2013 RxJava, Akka and Twitter- people meeting “Soon thereafter” 2013 Reactive Streams Expert group formed Apr 2015 Reactive Streams Spec 1.0 TCK 5+ impls 2017 Inclusion in JDK9 Akka Streams, RxJava Vert.x, MongoDB, … Jul 2015 Akka Streams 1.0 Now/Future JEP 321: HTTP Client ADBA - Async JDBC Jakarta EE?
7.
Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
8.
Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
Akka Streams in~20 seconds: final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final Source<Integer, NotUsed> source = Source.range(0, 20000000); final Flow<Integer, String, NotUsed> flow = Flow.fromFunction((Integer n) -> n.toString()); final Sink<String, CompletionStage<Done>> sink = Sink.foreach(str -> System.out.println(str)); final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink); runnable.run(materializer);
21.
Akka Streams in~20 seconds: final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final Source<Integer, NotUsed> source = Source.range(0, 20000000); final Flow<Integer, String, NotUsed> flow = Flow.fromFunction((Integer n) -> n.toString()); final Sink<String, CompletionStage<Done>> sink = Sink.foreach(str -> System.out.println(str)); final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink); runnable.run(materializer);
22.
Akka Streams in~20 seconds: final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final Source<Integer, NotUsed> source = Source.range(0, 20000000); final Flow<Integer, String, NotUsed> flow = Flow.fromFunction((Integer n) -> n.toString()); final Sink<String, CompletionStage<Done>> sink = Sink.foreach(str -> System.out.println(str)); final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink); runnable.run(materializer);
23.
final ActorSystem system= ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final Source<Integer, NotUsed> source = Source.range(0, 20000000); final Flow<Integer, String, NotUsed> flow = Flow.fromFunction((Integer n) -> n.toString()); final Sink<String, CompletionStage<Done>> sink = Sink.foreach(str -> System.out.println(str)); final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink); runnable.run(materializer); Akka Streams in ~20 seconds: Source (range) Integer Flow (fromFunction) Sink (foreach) Integer String String
24.
Akka Streams in~20 seconds: implicit val system = ActorSystem() implicit val mat = ActorMaterializer() val source = Source(0 to 20000000) val flow = Flow[Int].map(_.toString()) val sink = Sink.foreach[String](println(_)) val runnable = source.via(flow).to(sink) runnable.run()
Akka Streams in~20 seconds: Source(0 to 20000000) .map(_.toString) .runForeach(println)
27.
final ActorSystem system= ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream elements // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" + address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); Numbers as a service
28.
final ActorSystem system= ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream e // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); Numbers as a service
29.
final Materializer materializer= ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream e // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); Numbers as a service
Numbers as aservice implicit val system = ActorSystem() import system.dispatcher implicit val mat = ActorMaterializer() val numbers = Source(0 to Int.MaxValue).map(n => ByteString(n + "n")) val route = path("numbers") { get { complete( HttpResponse(entity = HttpEntity(`text/plain(UTF-8)`, numbers)) ) } } val futureBinding = Http().bindAndHandle(route, "127.0.0.1", 8080) futureBinding.onComplete { case Success(binding) => val address = binding.localAddress println(s"Akka HTTP server running at ${address.getHostString}:${address.getPort}") case Failure(ex) => println(s"Failed to bind HTTP server: ${ex.getMessage}") ex.fillInStackTrace() }
final ActorSystem system= ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final Database database = new Database(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 8080); final Http http = Http.get(system); final Flow<Message, Message, NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" + address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); A more useful example Credit to: Colin Breck
36.
final Materializer materializer= ActorMaterializer.create(system); final Database database = new Database(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 8080); final Http http = Http.get(system); final Flow<Message, Message, NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = A more useful example
37.
final Flow<Message, Message,NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); A more useful example
38.
.map(written -> TextMessage.create( "wrote upto: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); A more useful example
39.
A more usefulexample val measurementsFlow = Flow[Message].flatMapConcat(message => message.asTextMessage.getStreamedText.fold("")(_ + _) ) .groupedWithin(1000, 1.second) .mapAsync(5)(Database.asyncBulkInsert) .map(written => TextMessage("wrote up to: " + written.last)) val route = path("measurements") { get { handleWebSocketMessages(measurementsFlow) } } val futureBinding = Http().bindAndHandle(route, "127.0.0.1", 8080)
40.
The tale ofthe two pancake chefs HungrySink Frying Pan BatterSource Scoops of batter Pancakes nom nom nom asynchronous boundaries Roland Patrik
Rolands pipelined pancakes Flow<ScoopOfBatter,HalfCookedPancake, NotUsed> fryingPan1 = Flow.of(ScoopOfBatter.class).map(batter -> new HalfCookedPancake()); Flow<HalfCookedPancake, Pancake, NotUsed> fryingPan2 = Flow.of(HalfCookedPancake.class).map(halfCooked -> new Pancake()); Flow<ScoopOfBatter, Pancake, NotUsed> pancakeChef = fryingPan1.async().via(fryingPan2.async()); section in docs
43.
Rolands pipelined pancakes //Takes a scoop of batter and creates a pancake with one side cooked val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, NotUsed] = Flow[ScoopOfBatter].map { batter => HalfCookedPancake() } // Finishes a half-cooked pancake val fryingPan2: Flow[HalfCookedPancake, Pancake, NotUsed] = Flow[HalfCookedPancake].map { halfCooked => Pancake() } // With the two frying pans we can fully cook pancakes val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] = Flow[ScoopOfBatter].via(fryingPan1.async).via(fryingPan2.async) section in docs
GraphStage API public classMap<A, B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); setHandler(out, new AbstractOutHandler() { public void onPull() throws Exception { pull(in); } }); } }; } }
54.
public class Map<A,B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); complete sources on github GraphStage API
55.
public class Map<A,B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); complete sources on github GraphStage API
56.
} public FlowShape<A, B>shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); setHandler(out, new AbstractOutHandler() { public void onPull() throws Exception { pull(in); } }); } }; } } complete sources on github GraphStage API
57.
GraphStage API class Map[A,B](f: A => B) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("Map.in") val out = Outlet[B]("Map.out") val shape = FlowShape.of(in, out) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler() { def onPush(): Unit = { push(out, f(grab(in))) } }) setHandler(out, new AbstractOutHandler() { def onPull(): Unit = { pull(in) } }) } }
58.
When to usewhat abstraction modelling power complexity actors streams (completable) futures java.concurrency
59.
Actors ❤ Streams= True Actors – perfect for managing state, things with lifecycles, restarting, running many separate instances in parallel A running stream – can be seen as a state, has a lifecycle, is - not yet started, running, failed, stopped, can be stopped using special stages blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-i/ Again, Colin Breck:
60.
But, distributed systems? Stream-in-Actor C-1 ShardC Akka Streams is a local abstraction Combine with Akka Cluster for distributed superpowers! Stream-in-Actor A-1 Shard A Stream-in-Actor B-1 Shard B Stream-in-Actor B-2
SourceRef sending side Scala classSendingActor extends Actor { import context.dispatcher implicit val mat = ActorMaterializer()(context) val numbersSource = Source(1 to 100000) def receive = { case SendMeNumbers ⇒ val ref: Future[SourceRef[Int]] = numbersSource.runWith(StreamRefs.sourceRef()) val reply: Future[SendMeNumbersReply] = ref.map(ref => SendMeNumbersReply(ref)) reply pipeTo sender() } }
65.
SourceRef receiving side Scala classReceivingActor(sendingActor: ActorRef) extends Actor { sendingActor ! SendMeNumbers override def receive: Receive = { case SendMeNumbersReply(sourceRef) => sourceRef.runWith(Sink.foreach(println)) } }
66.
StreamRefs — newfeature since Akka 2.5.10 By design NOT a“distributed automatic stream processing deployment and management framework”. (e.g. how Spark, Flink or Beam could be described). Ideal for combining Alpakka Enterprise Integration Data Processing or Ingestion Pipelines between ActorSystems
67.
The community Forums discuss.akka.io Public chatrooms: http://gitter.im/akka/dev developing Akka http://gitter.im/akka/akka using Akka Easy to contribute tickets: https://github.com/akka/akka/issues?q=is%3Aissue+is%3Aopen+label%3Aeasy-to-contribute https://github.com/akka/akka/issues?q=is%3Aissue+is%3Aopen+label%3A%22nice-to-have+%28low-prio%29%22