Johan Andrén | Scala Swarm 2017 Building reactive distributed systems with Akka
Johan Andrén Akka Team Stockholm Scala User Group @apnylle johan.andren@lightbend.com
A man, a plan … Akka recap, distsys background and Akka Cluster basics
 We’ll get an overview of how Akka does clustering Distributed Data
 Eventual consistency through gossip Distributed PubSub
 A message bus across the cluster Cluster Singleton
 How to introduce a single point of failure Cluster Sharding
 Shard actors across cluster }Cluster Tools
Actor Message Inbox MessageMessage Akka Actors Recap
Actor Message Message Actor Actor Akka Actors Recap
Child Parent Child Failures ActorSystem Akka Actors Recap
Distributed Systems –Leslie Lamport ”A distributed system is one in which the failure of a computer you didn't even know existed can render your own computer unusable”
Why is it so hard? Reliability: power failure, old network 
 equipment, network congestion, coffee in router, 
 rodents, that guy in the IT dept., DDOS attacks… Latency: loopback vs local net vs shared congested local net vs internet Bandwidth: again loopback vs local vs shared local vs internet The Joys of Computer Networks:
Why do it, if it is so hard? Data or processing doesn’t fit a single machine
 Many objects, that should be kept in memory. Many not so powerful servers can be cheaper than a supercomputer. Elasticity
 Being able to scale in (less servers) and out (more servers) depending on load. Not paying for servers unless you need them. Resilience
 Building systems that will keep working in the face of failures or degrade gracefully.
Actor Model vs Network Interaction already modelled as immutable messages
 Data travels over the network in packages, changes has to be explicitly sent back. At most once
 Data reaches a node on the other side at most once, but can be lost, already part of model! A recipient of a message can reply directly to sender
 Regardless if there were intermediate recipients of the message Messages not limited to request response
 Messages can flow in either direction when two systems are connected.
Local ActorSystem Message Message Actor Actor Actor
JVM 2 JVM 1 Distributed ActorSystem ActorSystem Message Message Actor Actor Actor
Consistency vs Availability Strong Consistency Always Available ✂ Node 1 Node 2
Akka Cluster
Joining Join
Leadership (leader)
Leadership (leader) Downed
Cluster Member Lifecycle Joining Up Leaving Exiting removedDown Join Leader Action Leave Leader Action Leader Action Leader Action
Joining - when there is no cluster yet ? Join
Seed nodes !First seed node
 if none of the other nodes in the list are in the cluster - joins itself to form cluster Rest of seed nodes just pings all other nodes and joins as soon as one is in the cluster responds Join
What would happen if we mess it up? I’m the leader, this is the cluster! No! I’m the leader, this is the cluster! Join Join No! I’m the leader, this is the cluster!
Gossip & Heartbeats (leader)
Roles [api] [api] [worker, backend] [worker] [worker]
User API of Cluster Node details
 What roles am I in, what is my address Join, Leave, Down
 Programatic control over cluster membership Register listeners for cluster events
 Every time the cluster state changes the listening actor will get a message
val commonConfig = ConfigFactory.parseString( """ akka { actor.provider = cluster remote.artery.enabled = true remote.artery.canonical.hostname = 127.0.0.1 cluster.seed-nodes = [ "akka://cluster@127.0.0.1:25520", "akka://cluster@127.0.0.1:25521" ] cluster.jmx.multi-mbeans-in-same-jvm = on } """) def portConfig(port: Int) = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $port") val node1 = ActorSystem("cluster", portConfig(25520).withFallback(commonConfig)) val node2 = ActorSystem("cluster", portConfig(25521).withFallback(commonConfig)) val node3 = ActorSystem("cluster", portConfig(25522).withFallback(commonConfig)) Three node cluster complete sample sources on github
Akka Cluster Tools
Distributed Data CRDTs: Conflict free Replicated Data Types allow for updates on any node and then spreading that update to other cluster nodes through gossip for eventual consistency Note: Does not fit every problem! Online docs for Distributed Data
Special requirements Commutative
 Order of operation does not matter 
 like 3 + 4 = 4 + 3 Associative
 Grouping operations does not matter 
 like 3 + (4 + 5) = (3 + 4) + 5 Monotonic
 Absence of rollbacks, ”only growing” (but we can do sneaky tricks)
Built in data structures Counters
 GCounter - grow only, PNCounter - increment and decrement counter Sets
 GSet - grow only, ORSet - observed remove set Maps 
 ORMap - observed remove map, ORMultiMap - observed remove multi map, PNCounterMap - positive negative counter map, LWWMap - last writer wins map Flags and Register
 Flag - toggle once boolean, LWWRegister - last writer wins register
Update(key, …) Replicator Replicator Replicator In Action Replicator Replicator Get(key) Subscribe(key, actor) Update(key, …)
Distributed Data val replicator = DistributedData(system).replicator val CounterKey = GCounterKey("visit-counter-1") val InitialCounterValue = GCounter.empty replicator ! Replicator.Subscribe(CounterKey, actorRef) replicator ! Replicator.Update( key = CounterKey, initial = InitialCounterValue, writeConsistency = Replicator.WriteLocal ) { counter => counter.increment(Cluster(system)) } complete sample sources on github
Strong Consistency Always Available Eventually consistent - always accepts writes Distributed Data
Distributed Pub Sub Send(path, msg1) Publish(topic, msg2) Subscriber Registered Actor msg1 msg1 msg2 Subscriber Online docs for Distributed PubSub
With topics Subscribe(topic) Mediator Subscriber Mediator Mediator Subscriber Subscribe(topic)
With topics gossip Mediator Subscriber Mediator Mediator Subscriber gossip gossip
With topics Publish(topic, msg) Mediator Subscriber Mediator Mediator Subscriber msg msg
With actor path Mediator /user/my-actor Put(actorRef) Send(“/user/my-actor”) SendToAll(“/user/my-actor”) Mediator Mediator
Distributed PubSub val mediator = DistributedPubSub(system).mediator val actorRef = system.actorOf(props, "my-subscriber") mediator ! DistributedPubSubMediator.Subscribe("my-topic", actorRef) node3Mediator ! DistributedPubSubMediator.Publish( "my-topic", messageToAllSubscribers) complete sample sources on github
Strong Consistency Always Available Subscribers/Topics eventually consistent always accepts writes Distributed PubSub messages delivered at most once
Cluster Singleton Singleton Or: how to introduce a single point of failure Online docs for Cluster Singleton
Cluster Singleton SingletonManager SingletonManager SingletonManager (oldest) SingletonActor
Cluster Singleton SingletonManager SingletonManager SingletonManager (oldest) SingletonActor SingletonProxy Message
Cluster Singleton SingletonManager SingletonManager (oldest) SingletonActor Downed
system.actorOf( ClusterSingletonManager.props( singletonProps = CounterActor.props(), terminationMessage = PoisonPill, settings = ClusterSingletonManagerSettings(system) ), “counter-singleton-manager“) val proxy = system.actorOf( ClusterSingletonProxy.props( singletonManagerPath = s”/user/counter-singleton-manager“, settings = ClusterSingletonProxySettings(node) ), “counter-singleton-proxy") proxy ! CountMessage complete sample sources on github Cluster Singleton
Strong Consistency Always Available Only one cluster singleton will ever live Cluster Singleton
Cluster Sharding Actor with id 1 Shard actors based on a message property Message(recipientId = 1) Online docs for Cluster Sharding
Entity C-1 Entity B-4 Shard C Cluster Sharding ShardCoordinator (singleton) ShardRegion ShardRegion ShardRegion Shard A Shard B Entity A-1 Entity A-7
Entity C-1 Shard C Cluster Sharding ShardCoordinator (singleton) ShardRegion ShardRegion ShardRegion Envelope(“c-1”)
Entity C-1 Shard C Cluster Sharding ShardCoordinator (singleton) ShardRegion ShardRegion Envelope(“c-1”) Downed
case class Envelope(entityId: String, payload: Any) val extractEntityIdFunction: ShardRegion.ExtractEntityId = { case Envelope(entityId, payload) => (entityId, payload) } val extractShardIdFunction: ShardRegion.ExtractShardId = { case Envelope(entityId, _) => (entityId.hashCode % NumberOfShards).toString } val region = ClusterSharding(node).start( typeName = ShardTypeName, entityProps = CountingActor.props, settings = ClusterShardingSettings(node), extractEntityId = extractEntityIdFunction, extractShardId = extractShardIdFunction ) node1Region ! Envelope(entityId = "1", payload = “Hello actor 1”) Cluster Sharding complete sample sources on github
Strong Consistency Always Available Builds on cluster singleton to guarantee single instance of entity Cluster Sharding
akka.io
What’s up next?
Thanks for listening! Complete Samples: https://github.com/johanandren/akka-cluster-samples/tree/1.0 Docs, news, community links: akka.io @apnylle johan.andren@lightbend.com

Building reactive distributed systems with Akka