Akka streams Johan Andrén JEEConf, Kiev, 2018-05-19 Reactive stream processing using
Johan Andrén Akka Team Stockholm Scala User Group @apnylle johan.andren@lightbend.com
Build powerful reactive, concurrent, and distributed applications more easily Akka
Actors – simple & high performance concurrency Cluster / Remoting, Cluster tools – location transparency, resilience – and prepackaged tools for building distributed systems Streams – back-pressured stream processing Persistence – CQRS + Event Sourcing for Actors HTTP – fully async streaming HTTP Server Complete Java & Scala APIs for all features What’s in the toolkit?
Reactive Streams reactive-streams.org
Reactive Streams timeline Oct 2013 RxJava, Akka and Twitter- people meeting “Soon thereafter” 2013 Reactive Streams Expert group formed Apr 2015 Reactive Streams Spec 1.0 TCK 5+ impls 2017 Inclusion in JDK9 Akka Streams, RxJava Vert.x, MongoDB, … Jul 2015 Akka Streams 1.0 Now/Future JEP 321: HTTP Client ADBA - Async JDBC Jakarta EE?
Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
Stream processing Source Sink Flow
Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
Asynchronous stream processing Source Sink (possible) asynchronous boundaries Flow
Reactive Streams Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
No back pressure Source Sink 10 msg/s 1 msg/s Flow asynchronous boundary
No back pressure Source Sink 10 msg/s 1 msg/s Flow asynchronous boundary OOM!!
No back pressure - bounded buffer Source Sink 10 msg/s 1 msg/s Flow buffer size 6 🗑 asynchronous boundary
Async non blocking back pressure Source Sink 1 msg/s 1 msg/s Flow buffer size 6 🗑 asynchronous boundary Hey! give me 2 more
Reactive Streams RS Library A RS library B async boundary
Reactive Streams “Make building powerful concurrent & distributed applications simple.”
Complete and awesome Java and Scala APIs (Just like everything in Akka) Akka Streams
Akka Streams in ~20 seconds: final ActorSystem system = ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer);
Akka Streams in ~20 seconds: final ActorSystem system = ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer);
Akka Streams in ~20 seconds: final ActorSystem system = ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer);
final ActorSystem system = ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer); Akka Streams in ~20 seconds: Source (range) Integer Flow (fromFunction) Sink (foreach) Integer String String
Akka Streams in ~20 seconds: implicit val system = ActorSystem()
 implicit val mat = ActorMaterializer()
 
 val source = Source(0 to 20000000)
 
 val flow = Flow[Int].map(_.toString())
 
 val sink = Sink.foreach[String](println(_))
 
 val runnable = source.via(flow).to(sink)
 
 runnable.run()
Akka Streams in ~20 seconds: Source.range(0, 20000000)
 .map(Object::toString)
 .runForeach(str -> System.out.println(str), materializer);
Akka Streams in ~20 seconds: Source(0 to 20000000)
 .map(_.toString)
 .runForeach(println)
final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream elements // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" + address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); Numbers as a service
final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream e // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); Numbers as a service
final Materializer materializer = ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream e // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); Numbers as a service
complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ": address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); Numbers as a service
Numbers as a service implicit val system = ActorSystem() import system.dispatcher implicit val mat = ActorMaterializer() val numbers = Source(0 to Int.MaxValue).map(n => ByteString(n + "n")) val route = path("numbers") { get { complete( HttpResponse(entity = HttpEntity(`text/plain(UTF-8)`, numbers)) ) } } val futureBinding = Http().bindAndHandle(route, "127.0.0.1", 8080) futureBinding.onComplete { case Success(binding) => val address = binding.localAddress println(s"Akka HTTP server running at ${address.getHostString}:${address.getPort}") case Failure(ex) => println(s"Failed to bind HTTP server: ${ex.getMessage}") ex.fillInStackTrace() }
recv buffer send buffer 🚚 🚚 🚚 🚚 🚚 🚚 🚚 Back pressure over TCP numbers TCP HTTP Server Client
recv buffer send buffer 🚚 🚚 🚚 🚚 🚚 🚚 🚑 Back pressure over TCP numbers TCP HTTP Backpressure Server Client
recv buffer send buffer 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚑 Back pressure over TCP numbers TCP HTTP Backpressure Backpressure Server Client
final ActorSystem system = ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final Database database = new Database(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 8080); final Http http = Http.get(system); final Flow<Message, Message, NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" + address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); A more useful example Credit to: Colin Breck
final Materializer materializer = ActorMaterializer.create(system); final Database database = new Database(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 8080); final Http http = Http.get(system); final Flow<Message, Message, NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = A more useful example
final Flow<Message, Message, NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); A more useful example
.map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); A more useful example
A more useful example val measurementsFlow =
 Flow[Message].flatMapConcat(message =>
 message.asTextMessage.getStreamedText.fold("")(_ + _)
 )
 .groupedWithin(1000, 1.second)
 .mapAsync(5)(Database.asyncBulkInsert)
 .map(written => TextMessage("wrote up to: " + written.last))
 
 val route =
 path("measurements") {
 get {
 handleWebSocketMessages(measurementsFlow)
 }
 }
 
 val futureBinding = Http().bindAndHandle(route, "127.0.0.1", 8080)
The tale of the two pancake chefs HungrySink Frying Pan BatterSource Scoops of batter Pancakes nom nom nom asynchronous boundaries Roland Patrik
Rolands pipelined pancakes HungrySinkPan 2BatterSource Pan 1 nom nom nom
Rolands pipelined pancakes Flow<ScoopOfBatter, HalfCookedPancake, NotUsed> fryingPan1 =
 Flow.of(ScoopOfBatter.class).map(batter -> new HalfCookedPancake());
 
 Flow<HalfCookedPancake, Pancake, NotUsed> fryingPan2 =
 Flow.of(HalfCookedPancake.class).map(halfCooked -> new Pancake()); Flow<ScoopOfBatter, Pancake, NotUsed> pancakeChef =
 fryingPan1.async().via(fryingPan2.async()); section in docs
Rolands pipelined pancakes // Takes a scoop of batter and creates a pancake with one side cooked val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, NotUsed] =
 Flow[ScoopOfBatter].map { batter => HalfCookedPancake() }
 
 // Finishes a half-cooked pancake
 val fryingPan2: Flow[HalfCookedPancake, Pancake, NotUsed] =
 Flow[HalfCookedPancake].map { halfCooked => Pancake() } 
 // With the two frying pans we can fully cook pancakes val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] =
 Flow[ScoopOfBatter].via(fryingPan1.async).via(fryingPan2.async) section in docs
Patriks parallel pancakes HungrySink Pan 2 BatterSource Pan 1 Balance Merge nom nom nom
Patriks parallel pancakes Flow<ScoopOfBatter, Pancake, NotUsed> fryingPan =
 Flow.of(ScoopOfBatter.class).map(batter -> new Pancake());
 
 Flow<ScoopOfBatter, Pancake, NotUsed> pancakeChef =
 Flow.fromGraph(GraphDSL.create(builder -> {
 final UniformFanInShape<Pancake, Pancake> mergePancakes =
 builder.add(Merge.create(2));
 final UniformFanOutShape<ScoopOfBatter, ScoopOfBatter> dispatchBatter =
 builder.add(Balance.create(2));
 
 builder.from(dispatchBatter.out(0)) .via(builder.add(fryingPan.async())) .toInlet(mergePancakes.in(0));
 builder.from(dispatchBatter.out(1)) .via(builder.add(fryingPan.async())) .toInlet(mergePancakes.in(1));
 
 return FlowShape.of(dispatchBatter.in(), mergePancakes.out());
 })); section in docs
Patriks parallel pancakes val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] =
 Flow.fromGraph(GraphDSL.create() { implicit builder =>
 import GraphDSL.Implicits._ 
 val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
 val mergePancakes = builder.add(Merge[Pancake](2))
 
 // Using two pipelines, having two frying pans each, in total using
 // four frying pans
 dispatchBatter.out(0) ~> fryingPan1.async ~> fryingPan2.async ~> mergePancakes.in(0)
 dispatchBatter.out(1) ~> fryingPan1.async ~> fryingPan2.async ~> mergePancakes.in(1)
 
 FlowShape(dispatchBatter.in, mergePancakes.out)
 }) section in docs
Making pancakes together HungrySink Pan 3 BatterSource Pan 1 Balance Merge Pan 2 Pan 4 nom nom nom
Built in stages Flow stages map/fromFunction, mapConcat, statefulMapConcat, filter, filterNot, collect, grouped, sliding, scan, scanAsync, fold, foldAsync, reduce, drop, take, takeWhile, dropWhile, recover, recoverWith, recoverWithRetries, mapError, detach, throttle, intersperse, limit, limitWeighted, log, recoverWithRetries, mapAsync, mapAsyncUnordered, takeWithin, dropWithin, groupedWithin, initialDelay, delay, conflate, conflateWithSeed, batch, batchWeighted, expand, buffer, prefixAndTail, groupBy, splitWhen, splitAfter, flatMapConcat, flatMapMerge, initialTimeout, completionTimeout, idleTimeout, backpressureTimeout, keepAlive, initialDelay, merge, mergeSorted, Source stages fromIterator, apply, single, repeat, cycle, tick, fromFuture, fromCompletionStage, unfold, unfoldAsync, empty, maybe, failed, lazily, actorPublisher, actorRef, combine, unfoldResource, unfoldResourceAsync, queue, asSubscriber, fromPublisher, zipN, zipWithN Sink stages head, headOption, last, lastOption, ignore, cancelled, seq, foreach, foreachParallel, onComplete, lazyInit, queue, fold, reduce, combine, actorRef, actorRefWithAck, actorSubscriber, asPublisher, fromSubscriber Additional Sink and Source converters {from,as}OutputStream, {from,as}InputStream, {as,from} javaCollector, javaCollectorParallelUnordered File IO Sinks and Sources fromPath, toPath mergePreferred, zip, zipWith, zipWithIndex, concat, prepend, orElse, interleave, unzip, unzipWith, broadcast, balance, partition, watchTermination, monitor Even more Framing, JSON framing, killswitch, BroadcastHub, MergeHub
But I want to connect other things!
A community for Akka Streams connectors http://github.com/akka/alpakka Alpakka
Alpakka – a community for Stream connectors Existing Alpakka MQTT AMQP/ RabbitMQ SSE Cassandra FTP/ SFTP JSON, XML, CSV, RecordIO IronMq Files AWS DynamoDB AWS SNS,SQS, S3, Kinesis,Lambda JMS Azure Storage Queue TCP In Akka Actors Reactive Streams Java Streams Basic File IO External Apache Geode Eventuate FS2 Akka Http HBase http://developer.lightbend.com/docs/alpakka/current/index.html and more… Camel Kafka MongoDB Azure IoT
But my usecase is a unique snowflake! ❄ ❄ ❄
GraphStage API public class Map<A, B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); setHandler(out, new AbstractOutHandler() { public void onPull() throws Exception { pull(in); } }); } }; } }
public class Map<A, B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); complete sources on github GraphStage API
public class Map<A, B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); complete sources on github GraphStage API
} public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); setHandler(out, new AbstractOutHandler() { public void onPull() throws Exception { pull(in); } }); } }; } } complete sources on github GraphStage API
GraphStage API class Map[A, B](f: A => B) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("Map.in") val out = Outlet[B]("Map.out") val shape = FlowShape.of(in, out) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler() { def onPush(): Unit = { push(out, f(grab(in))) } }) setHandler(out, new AbstractOutHandler() { def onPull(): Unit = { pull(in) } }) } }
When to use what abstraction modelling power complexity actors streams (completable) futures java.concurrency
Actors ❤ Streams = True Actors – perfect for managing state, things with lifecycles, restarting, running many separate instances in parallel A running stream – can be seen as a state, has a lifecycle, is - not yet started, running, failed, stopped, can be stopped using special stages blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-i/ Again, Colin Breck:
But, distributed systems? Stream-in-Actor C-1 Shard C Akka Streams is a local abstraction Combine with Akka Cluster for distributed superpowers! Stream-in-Actor A-1 Shard A Stream-in-Actor B-1 Shard B Stream-in-Actor B-2
Streams between nodes Source Flow Flow Sink
JVM 2 ActorSystem JVM 1 ActorSystem (special) SinkSource Flow Flow Sink SourceRef StreamRefs — new feature since Akka 2.5.10
JVM 2 ActorSystem JVM 1 ActorSystem (special) SinkSource Flow Flow SinkSourceRef This end controls the pace through backpressure StreamRefs — new feature since Akka 2.5.10
SourceRef sending side Scala class SendingActor extends Actor { import context.dispatcher implicit val mat = ActorMaterializer()(context) val numbersSource = Source(1 to 100000) def receive = { case SendMeNumbers ⇒ val ref: Future[SourceRef[Int]] = numbersSource.runWith(StreamRefs.sourceRef()) val reply: Future[SendMeNumbersReply] = ref.map(ref => SendMeNumbersReply(ref)) reply pipeTo sender() } }
SourceRef receiving side Scala class ReceivingActor(sendingActor: ActorRef) extends Actor { sendingActor ! SendMeNumbers override def receive: Receive = { case SendMeNumbersReply(sourceRef) => sourceRef.runWith(Sink.foreach(println)) } }
StreamRefs — new feature since Akka 2.5.10 By design NOT a“distributed automatic stream processing deployment and management framework”. (e.g. how Spark, Flink or Beam could be described). Ideal for combining Alpakka Enterprise Integration Data Processing or Ingestion Pipelines between ActorSystems
The community Forums discuss.akka.io Public chat rooms: http://gitter.im/akka/dev developing Akka http://gitter.im/akka/akka using Akka Easy to contribute tickets: https://github.com/akka/akka/issues?q=is%3Aissue+is%3Aopen+label%3Aeasy-to-contribute https://github.com/akka/akka/issues?q=is%3Aissue+is%3Aopen+label%3A%22nice-to-have+%28low-prio%29%22
Questions @apnylle johan.andren@lightbend.com http://akka.io Akka
Thank you! @apnylle johan.andren@lightbend.com All sample code (Java & Scala) https://github.com/johanandren/akka-stream-samples/tree/jeeconf-2018 http://akka.io Akka

Reactive stream processing using Akka streams

  • 1.
    Akka streams Johan Andrén JEEConf,Kiev, 2018-05-19 Reactive stream processing using
  • 2.
    Johan Andrén Akka Team StockholmScala User Group @apnylle johan.andren@lightbend.com
  • 3.
    Build powerful reactive,concurrent, and distributed applications more easily Akka
  • 4.
    Actors – simple& high performance concurrency Cluster / Remoting, Cluster tools – location transparency, resilience – and prepackaged tools for building distributed systems Streams – back-pressured stream processing Persistence – CQRS + Event Sourcing for Actors HTTP – fully async streaming HTTP Server Complete Java & Scala APIs for all features What’s in the toolkit?
  • 5.
  • 6.
    Reactive Streams timeline Oct2013 RxJava, Akka and Twitter- people meeting “Soon thereafter” 2013 Reactive Streams Expert group formed Apr 2015 Reactive Streams Spec 1.0 TCK 5+ impls 2017 Inclusion in JDK9 Akka Streams, RxJava Vert.x, MongoDB, … Jul 2015 Akka Streams 1.0 Now/Future JEP 321: HTTP Client ADBA - Async JDBC Jakarta EE?
  • 7.
    Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
  • 8.
    Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
  • 9.
  • 10.
    Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
  • 11.
    Asynchronous stream processing SourceSink (possible) asynchronous boundaries Flow
  • 12.
    Reactive Streams Reactive Streamsis an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols http://www.reactive-streams.org “
  • 13.
    No back pressure SourceSink 10 msg/s 1 msg/s Flow asynchronous boundary
  • 14.
    No back pressure SourceSink 10 msg/s 1 msg/s Flow asynchronous boundary OOM!!
  • 15.
    No back pressure- bounded buffer Source Sink 10 msg/s 1 msg/s Flow buffer size 6 🗑 asynchronous boundary
  • 16.
    Async non blockingback pressure Source Sink 1 msg/s 1 msg/s Flow buffer size 6 🗑 asynchronous boundary Hey! give me 2 more
  • 17.
    Reactive Streams RS LibraryA RS library B async boundary
  • 18.
    Reactive Streams “Make buildingpowerful concurrent & distributed applications simple.”
  • 19.
    Complete and awesome Javaand Scala APIs (Just like everything in Akka) Akka Streams
  • 20.
    Akka Streams in~20 seconds: final ActorSystem system = ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer);
  • 21.
    Akka Streams in~20 seconds: final ActorSystem system = ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer);
  • 22.
    Akka Streams in~20 seconds: final ActorSystem system = ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer);
  • 23.
    final ActorSystem system= ActorSystem.create();
 final Materializer materializer = ActorMaterializer.create(system);
 
 final Source<Integer, NotUsed> source =
 Source.range(0, 20000000);
 
 final Flow<Integer, String, NotUsed> flow =
 Flow.fromFunction((Integer n) -> n.toString());
 
 final Sink<String, CompletionStage<Done>> sink =
 Sink.foreach(str -> System.out.println(str));
 
 final RunnableGraph<NotUsed> runnable = source.via(flow).to(sink);
 
 runnable.run(materializer); Akka Streams in ~20 seconds: Source (range) Integer Flow (fromFunction) Sink (foreach) Integer String String
  • 24.
    Akka Streams in~20 seconds: implicit val system = ActorSystem()
 implicit val mat = ActorMaterializer()
 
 val source = Source(0 to 20000000)
 
 val flow = Flow[Int].map(_.toString())
 
 val sink = Sink.foreach[String](println(_))
 
 val runnable = source.via(flow).to(sink)
 
 runnable.run()
  • 25.
    Akka Streams in~20 seconds: Source.range(0, 20000000)
 .map(Object::toString)
 .runForeach(str -> System.out.println(str), materializer);
  • 26.
    Akka Streams in~20 seconds: Source(0 to 20000000)
 .map(_.toString)
 .runForeach(println)
  • 27.
    final ActorSystem system= ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream elements // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" + address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); Numbers as a service
  • 28.
    final ActorSystem system= ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream e // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); Numbers as a service
  • 29.
    final Materializer materializer= ActorMaterializer.create(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 9000); final Http http = Http.get(system); // Note that a more realistic solution would use the EntityStreaming API to stream e // as for example JSON Streaming (see the docs for more details) final Source<ByteString, NotUsed> numbers = Source.range(0, Integer.MAX_VALUE) .map(n -> ByteString.fromString(n.toString() + "n")); final Route route = path("numbers", () -> get(() -> complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); Numbers as a service
  • 30.
    complete(HttpResponse.create() .withStatus(StatusCodes.OK) .withEntity(HttpEntities.create( ContentTypes.TEXT_PLAIN_UTF8, numbers ))) ) ); final CompletionStage<ServerBinding> bindingCompletionStage= http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ": address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); Numbers as a service
  • 31.
    Numbers as aservice implicit val system = ActorSystem() import system.dispatcher implicit val mat = ActorMaterializer() val numbers = Source(0 to Int.MaxValue).map(n => ByteString(n + "n")) val route = path("numbers") { get { complete( HttpResponse(entity = HttpEntity(`text/plain(UTF-8)`, numbers)) ) } } val futureBinding = Http().bindAndHandle(route, "127.0.0.1", 8080) futureBinding.onComplete { case Success(binding) => val address = binding.localAddress println(s"Akka HTTP server running at ${address.getHostString}:${address.getPort}") case Failure(ex) => println(s"Failed to bind HTTP server: ${ex.getMessage}") ex.fillInStackTrace() }
  • 32.
    recv buffer send buffer 🚚 🚚 🚚 🚚 🚚 🚚 🚚 Backpressure over TCP numbers TCP HTTP Server Client
  • 33.
    recv buffer send buffer 🚚 🚚 🚚 🚚 🚚 🚚 🚑 Backpressure over TCP numbers TCP HTTP Backpressure Server Client
  • 34.
    recv buffer send buffer 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚚 🚑 Backpressure over TCP numbers TCP HTTP Backpressure Backpressure Server Client
  • 35.
    final ActorSystem system= ActorSystem.create(); final Materializer materializer = ActorMaterializer.create(system); final Database database = new Database(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 8080); final Http http = Http.get(system); final Flow<Message, Message, NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" + address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); A more useful example Credit to: Colin Breck
  • 36.
    final Materializer materializer= ActorMaterializer.create(system); final Database database = new Database(system); final ConnectHttp host = ConnectHttp.toHost("127.0.0.1", 8080); final Http http = Http.get(system); final Flow<Message, Message, NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = A more useful example
  • 37.
    final Flow<Message, Message,NotUsed> measurementsFlow = Flow.of(Message.class) .flatMapConcat((Message message) -> // handles both strict and streamed ws messages by folding // the later into a single string (in memory) message.asTextMessage() .getStreamedText() .fold("", (acc, elem) -> acc + elem) ) .groupedWithin(1000, Duration.of(1, ChronoUnit.SECONDS)) .mapAsync(5, database::asyncBulkInsert) .map(written -> TextMessage.create( "wrote up to: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); A more useful example
  • 38.
    .map(written -> TextMessage.create( "wrote upto: " + written.get(written.size() - 1) ) ); final Route route = path("measurements", () -> get(() -> handleWebSocketMessages(measurementsFlow) ) ); final CompletionStage<ServerBinding> bindingCompletionStage = http.bindAndHandle(route.flow(system, materializer), host, materializer); bindingCompletionStage.whenComplete((binding, exception) -> { if (binding != null) { final InetSocketAddress address = binding.localAddress(); System.out.println("Akka HTTP server running at " + address.getHostString() + ":" address.getPort()); } else { System.out.print("Failed to bind HTTP server: " + exception.getMessage()); exception.fillInStackTrace(); } }); A more useful example
  • 39.
    A more usefulexample val measurementsFlow =
 Flow[Message].flatMapConcat(message =>
 message.asTextMessage.getStreamedText.fold("")(_ + _)
 )
 .groupedWithin(1000, 1.second)
 .mapAsync(5)(Database.asyncBulkInsert)
 .map(written => TextMessage("wrote up to: " + written.last))
 
 val route =
 path("measurements") {
 get {
 handleWebSocketMessages(measurementsFlow)
 }
 }
 
 val futureBinding = Http().bindAndHandle(route, "127.0.0.1", 8080)
  • 40.
    The tale ofthe two pancake chefs HungrySink Frying Pan BatterSource Scoops of batter Pancakes nom nom nom asynchronous boundaries Roland Patrik
  • 41.
    Rolands pipelined pancakes HungrySinkPan2BatterSource Pan 1 nom nom nom
  • 42.
    Rolands pipelined pancakes Flow<ScoopOfBatter,HalfCookedPancake, NotUsed> fryingPan1 =
 Flow.of(ScoopOfBatter.class).map(batter -> new HalfCookedPancake());
 
 Flow<HalfCookedPancake, Pancake, NotUsed> fryingPan2 =
 Flow.of(HalfCookedPancake.class).map(halfCooked -> new Pancake()); Flow<ScoopOfBatter, Pancake, NotUsed> pancakeChef =
 fryingPan1.async().via(fryingPan2.async()); section in docs
  • 43.
    Rolands pipelined pancakes //Takes a scoop of batter and creates a pancake with one side cooked val fryingPan1: Flow[ScoopOfBatter, HalfCookedPancake, NotUsed] =
 Flow[ScoopOfBatter].map { batter => HalfCookedPancake() }
 
 // Finishes a half-cooked pancake
 val fryingPan2: Flow[HalfCookedPancake, Pancake, NotUsed] =
 Flow[HalfCookedPancake].map { halfCooked => Pancake() } 
 // With the two frying pans we can fully cook pancakes val pancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] =
 Flow[ScoopOfBatter].via(fryingPan1.async).via(fryingPan2.async) section in docs
  • 44.
    Patriks parallel pancakes HungrySink Pan2 BatterSource Pan 1 Balance Merge nom nom nom
  • 45.
    Patriks parallel pancakes Flow<ScoopOfBatter,Pancake, NotUsed> fryingPan =
 Flow.of(ScoopOfBatter.class).map(batter -> new Pancake());
 
 Flow<ScoopOfBatter, Pancake, NotUsed> pancakeChef =
 Flow.fromGraph(GraphDSL.create(builder -> {
 final UniformFanInShape<Pancake, Pancake> mergePancakes =
 builder.add(Merge.create(2));
 final UniformFanOutShape<ScoopOfBatter, ScoopOfBatter> dispatchBatter =
 builder.add(Balance.create(2));
 
 builder.from(dispatchBatter.out(0)) .via(builder.add(fryingPan.async())) .toInlet(mergePancakes.in(0));
 builder.from(dispatchBatter.out(1)) .via(builder.add(fryingPan.async())) .toInlet(mergePancakes.in(1));
 
 return FlowShape.of(dispatchBatter.in(), mergePancakes.out());
 })); section in docs
  • 46.
    Patriks parallel pancakes valpancakeChef: Flow[ScoopOfBatter, Pancake, NotUsed] =
 Flow.fromGraph(GraphDSL.create() { implicit builder =>
 import GraphDSL.Implicits._ 
 val dispatchBatter = builder.add(Balance[ScoopOfBatter](2))
 val mergePancakes = builder.add(Merge[Pancake](2))
 
 // Using two pipelines, having two frying pans each, in total using
 // four frying pans
 dispatchBatter.out(0) ~> fryingPan1.async ~> fryingPan2.async ~> mergePancakes.in(0)
 dispatchBatter.out(1) ~> fryingPan1.async ~> fryingPan2.async ~> mergePancakes.in(1)
 
 FlowShape(dispatchBatter.in, mergePancakes.out)
 }) section in docs
  • 47.
    Making pancakes together HungrySink Pan3 BatterSource Pan 1 Balance Merge Pan 2 Pan 4 nom nom nom
  • 48.
    Built in stagesFlow stages map/fromFunction, mapConcat, statefulMapConcat, filter, filterNot, collect, grouped, sliding, scan, scanAsync, fold, foldAsync, reduce, drop, take, takeWhile, dropWhile, recover, recoverWith, recoverWithRetries, mapError, detach, throttle, intersperse, limit, limitWeighted, log, recoverWithRetries, mapAsync, mapAsyncUnordered, takeWithin, dropWithin, groupedWithin, initialDelay, delay, conflate, conflateWithSeed, batch, batchWeighted, expand, buffer, prefixAndTail, groupBy, splitWhen, splitAfter, flatMapConcat, flatMapMerge, initialTimeout, completionTimeout, idleTimeout, backpressureTimeout, keepAlive, initialDelay, merge, mergeSorted, Source stages fromIterator, apply, single, repeat, cycle, tick, fromFuture, fromCompletionStage, unfold, unfoldAsync, empty, maybe, failed, lazily, actorPublisher, actorRef, combine, unfoldResource, unfoldResourceAsync, queue, asSubscriber, fromPublisher, zipN, zipWithN Sink stages head, headOption, last, lastOption, ignore, cancelled, seq, foreach, foreachParallel, onComplete, lazyInit, queue, fold, reduce, combine, actorRef, actorRefWithAck, actorSubscriber, asPublisher, fromSubscriber Additional Sink and Source converters {from,as}OutputStream, {from,as}InputStream, {as,from} javaCollector, javaCollectorParallelUnordered File IO Sinks and Sources fromPath, toPath mergePreferred, zip, zipWith, zipWithIndex, concat, prepend, orElse, interleave, unzip, unzipWith, broadcast, balance, partition, watchTermination, monitor Even more Framing, JSON framing, killswitch, BroadcastHub, MergeHub
  • 49.
    But I wantto connect other things!
  • 50.
    A community forAkka Streams connectors http://github.com/akka/alpakka Alpakka
  • 51.
    Alpakka – acommunity for Stream connectors Existing Alpakka MQTT AMQP/ RabbitMQ SSE Cassandra FTP/ SFTP JSON, XML, CSV, RecordIO IronMq Files AWS DynamoDB AWS SNS,SQS, S3, Kinesis,Lambda JMS Azure Storage Queue TCP In Akka Actors Reactive Streams Java Streams Basic File IO External Apache Geode Eventuate FS2 Akka Http HBase http://developer.lightbend.com/docs/alpakka/current/index.html and more… Camel Kafka MongoDB Azure IoT
  • 52.
    But my usecaseis a unique snowflake! ❄ ❄ ❄
  • 53.
    GraphStage API public classMap<A, B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); setHandler(out, new AbstractOutHandler() { public void onPull() throws Exception { pull(in); } }); } }; } }
  • 54.
    public class Map<A,B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); complete sources on github GraphStage API
  • 55.
    public class Map<A,B> extends GraphStage<FlowShape<A, B>> { private final Function<A, B> f; public final Inlet<A> in = Inlet.create("Map.in"); public final Outlet<B> out = Outlet.create("Map.out"); private final FlowShape<A, B> shape = FlowShape.of(in, out); public Map(Function<A, B> f) { this.f = f; } public FlowShape<A, B> shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); complete sources on github GraphStage API
  • 56.
    } public FlowShape<A, B>shape() { return shape; } public GraphStageLogic createLogic(Attributes inheritedAttributes) { return new GraphStageLogic(shape) { { setHandler(in, new AbstractInHandler() { public void onPush() throws Exception { push(out, f.apply(grab(in))); } }); setHandler(out, new AbstractOutHandler() { public void onPull() throws Exception { pull(in); } }); } }; } } complete sources on github GraphStage API
  • 57.
    GraphStage API class Map[A,B](f: A => B) extends GraphStage[FlowShape[A, B]] { val in = Inlet[A]("Map.in") val out = Outlet[B]("Map.out") val shape = FlowShape.of(in, out) def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { setHandler(in, new InHandler() { def onPush(): Unit = { push(out, f(grab(in))) } }) setHandler(out, new AbstractOutHandler() { def onPull(): Unit = { pull(in) } }) } }
  • 58.
    When to usewhat abstraction modelling power complexity actors streams (completable) futures java.concurrency
  • 59.
    Actors ❤ Streams= True Actors – perfect for managing state, things with lifecycles, restarting, running many separate instances in parallel A running stream – can be seen as a state, has a lifecycle, is - not yet started, running, failed, stopped, can be stopped using special stages blog.colinbreck.com/integrating-akka-streams-and-akka-actors-part-i/ Again, Colin Breck:
  • 60.
    But, distributed systems? Stream-in-Actor C-1 ShardC Akka Streams is a local abstraction Combine with Akka Cluster for distributed superpowers! Stream-in-Actor A-1 Shard A Stream-in-Actor B-1 Shard B Stream-in-Actor B-2
  • 61.
  • 62.
    JVM 2 ActorSystem JVM 1 ActorSystem (special) SinkSourceFlow Flow Sink SourceRef StreamRefs — new feature since Akka 2.5.10
  • 63.
    JVM 2 ActorSystem JVM 1 ActorSystem (special) SinkSourceFlow Flow SinkSourceRef This end controls the pace through backpressure StreamRefs — new feature since Akka 2.5.10
  • 64.
    SourceRef sending side Scala classSendingActor extends Actor { import context.dispatcher implicit val mat = ActorMaterializer()(context) val numbersSource = Source(1 to 100000) def receive = { case SendMeNumbers ⇒ val ref: Future[SourceRef[Int]] = numbersSource.runWith(StreamRefs.sourceRef()) val reply: Future[SendMeNumbersReply] = ref.map(ref => SendMeNumbersReply(ref)) reply pipeTo sender() } }
  • 65.
    SourceRef receiving side Scala classReceivingActor(sendingActor: ActorRef) extends Actor { sendingActor ! SendMeNumbers override def receive: Receive = { case SendMeNumbersReply(sourceRef) => sourceRef.runWith(Sink.foreach(println)) } }
  • 66.
    StreamRefs — newfeature since Akka 2.5.10 By design NOT a“distributed automatic stream processing deployment and management framework”. (e.g. how Spark, Flink or Beam could be described). Ideal for combining Alpakka Enterprise Integration Data Processing or Ingestion Pipelines between ActorSystems
  • 67.
    The community Forums discuss.akka.io Public chatrooms: http://gitter.im/akka/dev developing Akka http://gitter.im/akka/akka using Akka Easy to contribute tickets: https://github.com/akka/akka/issues?q=is%3Aissue+is%3Aopen+label%3Aeasy-to-contribute https://github.com/akka/akka/issues?q=is%3Aissue+is%3Aopen+label%3A%22nice-to-have+%28low-prio%29%22
  • 68.
  • 69.
    Thank you! @apnylle johan.andren@lightbend.com All samplecode (Java & Scala) https://github.com/johanandren/akka-stream-samples/tree/jeeconf-2018 http://akka.io Akka