Skip to content

Commit ac5b09d

Browse files
committed
add lifecycle Monitoring
1 parent 4c488fb commit ac5b09d

File tree

1 file changed

+105
-0
lines changed

1 file changed

+105
-0
lines changed

reactive-programming.org

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1754,3 +1754,108 @@ class Listener(source: ActorRef) extends Actor {
17541754
Actor-local state cannot be kept across restarts, only external state can be managed like this.
17551755

17561756
Child actors not stopped during restart will be restarted recursively.
1757+
** Lifecycle Monitoring and the Error Kernel
1758+
*** Lifecycle Monitoring
1759+
The only observable transition occurs when stopping an actor:
1760+
- having an =ActorRef= implies liveness (at some earlier point)
1761+
- restarts are not externally visible
1762+
- after stop there will be no more responses
1763+
No replies could also be due to communication failure, therefore Akka supports Lifecycle Monitoring a.k.a. DeathWatch.
1764+
- an =Actor= registers its interest using =context.watch(target)=
1765+
- it will receive a =Terminated(target)= message when target stops
1766+
- it will not receive any direct messages from target thereafter
1767+
*** The DeathWatch API
1768+
#+begin_src scala
1769+
trait ActorContext {
1770+
def watch(target: ActorRef): ActorRef
1771+
def unwatch(target: ActorRef): ActorRef
1772+
...
1773+
}
1774+
1775+
case class Terminated private[akka] (actor: ActorRef)
1776+
(val existenceConfired: Boolean, val addressTerminated: Boolean)
1777+
extends AutoReceiveMessage with PossiblyHarmful
1778+
#+end_src
1779+
*** The Children List
1780+
Each =actor= maintains a list of the =actors= it created:
1781+
- the =child= has been entered when =context.actorOf= returns
1782+
- the =child= has been removed when =Terminated= is received
1783+
- an =actor= name is available IFF there is no such =child=
1784+
#+begin_src scala
1785+
trait ActorContext {
1786+
def children: Iterable[ActorRef]
1787+
def child(name: String): Option[ActorRef]
1788+
...
1789+
}
1790+
#+end_src
1791+
*** Applying DeathWatch to Contoller & Getter
1792+
#+begin_src scala
1793+
class Controller extends Actor with ActorLogging {
1794+
override val supervisorStrategy = OneForOneStrategy(maxNrOfRetries = 5) {
1795+
case _: Exception => SupervisorStrategy.Restart
1796+
}
1797+
def receive = {
1798+
case Check(url, depth) =>
1799+
if (!cache(url) && depth > 0)
1800+
context.watch(context.actorOf(getterProps(url, depth - 1)))
1801+
cache += url
1802+
case Terminated(_) =>
1803+
if (context.children.isEmpty) context.parent ! Result(cache)
1804+
case ReceiveTimeout => context.children foreach context.stop
1805+
}
1806+
...
1807+
}
1808+
#+end_src
1809+
*** The Error Kernel
1810+
Keep important data near the root, delegate risk to the leaves.
1811+
- restarts are recursive (supervised actors are part of the state)
1812+
- restarts are more frequent near the leaves
1813+
- avoid restarting Actors with important state
1814+
*** Application to Receptionist
1815+
- Always stop Contoller if it has a problem.
1816+
- React to =Terminated= to catch cases where =no Result= was sent.
1817+
- Discard =Terminated= after =Result= was sent.
1818+
#+begin_src scala
1819+
class Receptionist extends Actor {
1820+
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
1821+
...
1822+
}
1823+
#+end_src
1824+
*** Interjection: the EventStream
1825+
Actors can direct messages only at known address.
1826+
1827+
The EventStream allows publication of messages to an unknown audience.
1828+
1829+
Every actor can optionally subscribe to (parts of) the EventStream.
1830+
#+begin_src scala
1831+
trait EventStream {
1832+
def subscribe(subscriber: ActorRef, topic: Class[_]): Boolean
1833+
def unsubscribe(subscriber: ActorRef, topic: Class[_]): Boolean
1834+
def unsubscribe(subscriber: ActorRef): Unit
1835+
def publish(event: AnyRef): Unit
1836+
}
1837+
1838+
class Listener extends Actor {
1839+
context.system.eventStream.subscribe(self, classOf[LogEvent])
1840+
def receive = {
1841+
case e: LogEvent => ...
1842+
}
1843+
override def postStop(): Unit = {
1844+
context.system.eventStream.unsubscribe(self)
1845+
}
1846+
}
1847+
#+end_src
1848+
*** Where do Unhandled Messages Go?
1849+
=Actor.Receive= is a partial function, the behavior may not apply.
1850+
1851+
=Unhandled= messages are passed into the =unhandled= method:
1852+
#+begin_src scala
1853+
trait Actor {
1854+
...
1855+
def unhandled(message: Any): Unit = message match {
1856+
case Terminated(target) => throw new DeathPactException(target)
1857+
case msg =>
1858+
context.system.eventStream.publish(UnhandledMessage(msg, sender, self))
1859+
}
1860+
}
1861+
#+end_src

0 commit comments

Comments
 (0)