An actor is a stateful resource that can be started, stopped and restarted.
Akka Typed and Akka Classic actors show some differences in their default lifecycle.
Actors offer lifecycle hooks to execute custom tasks on:
Classic Actors rely on methods that can be overridden: preStart
, preRestart
,
postRestart
and postStop
.
Akka Typed uses a combination of signals (PreRestart
and PostStop
) and
code inside Behaviors.setup
.
If the actor encounters some failure and throws an Exception
the actor will:
An important distinction:
Supervision is the act of deciding how to react when Failures occur.
There 3 different main approaches when supervising:
Default Supervision strategy differs between Akka Classic and Typed:
Supervision strategy is traditionally defined within the parent actor. However, in Akka Typed, it could also be defined withing the same actor.
import akka.actor.{Actor, ActorRef, AllForOneStrategy, Props, SupervisorStrategy}
import scala.concurrent.duration._
import io.github.jlprat.akka.lnl.supervision.classic.SupervisionExample.Child._
import io.github.jlprat.akka.lnl.supervision.classic.SupervisionExample.Parent.Init
object SupervisionExample {
object Child {
sealed trait Command
case class Save(tag: String, value: Int) extends Command
case class Retrieve(key: Key) extends Command
case class Key(id: String)
case class Stored(product: Product)
case class Product(hash: String, tag: String, value: Int)
def props(): Props = Props(classOf[Child])
}
class Child extends Actor {
var storage: Map[String, Product] = Map.empty
override def receive: Actor.Receive = {
case Save(tag, value) =>
val hash = s"${tag.hashCode()}"
storage = storage.updated(hash, Product(hash, tag, value))
sender() ! Key(hash)
case Retrieve(key) if !storage.contains(key.id) =>
throw new IllegalStateException("No such key!")
case Retrieve(key) =>
sender() ! Stored(storage(key.id))
}
}
object Parent {
sealed trait Command
case object Init extends Command
def props(): Props = Props(classOf[Parent])
}
class Parent extends Actor {
override def receive: Actor.Receive = {
case Init =>
val child = context.actorOf(Child.props())
context.become(initialized(child))
case _ => throw new IllegalStateException("Not yet initialized")
}
def initialized(child: ActorRef): Actor.Receive = {
case Init => throw new IllegalStateException("Already initialized")
case msg => child ! msg
}
override def supervisorStrategy: SupervisorStrategy =
AllForOneStrategy(3, 1.minute) {
case _: IllegalStateException => SupervisorStrategy.restart
case _ => SupervisorStrategy.escalate
}
}
}
import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._
object SupervisionExample {
sealed trait Command
case object Init extends Command
sealed trait NodeCommand extends Command
case class Save(tag: String, value: Int, replyTo: ActorRef[Key])
extends NodeCommand
case class Retrieve(key: Key, replyTo: ActorRef[Stored]) extends NodeCommand
case class Key(id: String)
case class Stored(product: Product)
case class Product(hash: String, tag: String, value: Int)
def supervise(behavior: Behavior[NodeCommand]): Behavior[NodeCommand] =
Behaviors
.supervise(behavior)
.onFailure[IllegalStateException](SupervisorStrategy
.restart.withLimit(3, 1.minute))
def apply(): Behavior[Command] =
Behaviors.receive {
case (context, Init) =>
val child = context.spawnAnonymous(supervise(store()))
initialized(child)
case _ => throw new IllegalStateException("Not yet initialized")
}
def initialized(child: ActorRef[NodeCommand]): Behavior[Command] =
Behaviors.receiveMessage {
case Init => throw new IllegalStateException("Already initialized")
case c @ Save(_, _, _) =>
child.tell(c)
Behaviors.same
case c @ Retrieve(_, _) =>
child.tell(c)
Behaviors.same
}
def store(storage: Map[String, Product] = Map.empty): Behavior[NodeCommand] =
Behaviors.receiveMessage {
case Save(tag, value, replyTo) =>
val hash = s"${tag.hashCode()}"
replyTo ! Key(hash)
store(storage.updated(hash, Product(hash, tag, value)))
case Retrieve(key, _) if !storage.contains(key.id) =>
throw new IllegalStateException("No such key!")
case Retrieve(key, replyTo) =>
replyTo ! Stored(storage(key.id))
Behaviors.same
}
}
There are other possibilities for supervision like the back-off ones where actors will wait for some specified time before restarting.
There are several possibilities:
preStart
or Behaviors.setup
hooksclass Initialization extends Actor {
// Constructor initialization
initDB()
// Pre Start Hook initialization
override def preStart(): Unit = {
initDB()
}
override def receive: Actor.Receive = {
case Init =>
// Initialization via Message
initDB()
case _ => ??? // normal actor work
}
private def initDB(): Unit = ()
}
All possibilities are shown, do not use all of them for the same task.
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
object Initialization {
sealed trait Command
case object Init extends Command
def apply(): Behavior[Command] = Behaviors.setup{ _ =>
// Setup hook
initDB()
Behaviors.receiveMessage{
case Init =>
// Initialize via message
initDB()
Behaviors.same
case _ => //do normal things
Behaviors.same
}
}
private def initDB(): Unit = ()
}
All possibilities are shown, do not use all of them for the same task.
How can we execute different tasks on restart than the start ones?
class Restart extends Actor {
override def preStart(): Unit = {
// We create our children, but we want to keep them on restarts
initChildren()
}
/**
* default implementation:
*
* def preRestart(@unused reason: Throwable, @unused message: Option[Any]): Unit = {
* context.children.foreach { child =>
* context.unwatch(child)
* context.stop(child)
* }
* postStop()
* }
*/
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
// We don't stop our children anymore!
postStop()
}
/**
* Default implementation
* def postRestart(@unused reason: Throwable): Unit = {
* preStart()
* }
*/
override def postRestart(reason: Throwable): Unit = {
// we do not call `preStart` so no new children are created
}
...
}
object Restart {
sealed trait Command
case object DoThings extends Command
def apply(): Behavior[Command] =
Behaviors.setup { context =>
// This will be executed on Start
createChildren(context)
Behaviors.supervise {
// This code will be executed on Start and Restart
Behaviors.receiveMessage[Command] {
case DoThings =>
//Do things
Behaviors.same
}
}.onFailure[Exception](SupervisorStrategy.restart.withStopChildren(false))
}
def createChildren(context: ActorContext[Command]): Unit = ()
}
To successfully realize a graceful shutdown of your actor system, you should define to handle this situation in your top level actor a message.
import akka.actor.{Actor, ActorLogging, Props}
import io.github.jlprat.akka.lnl.supervision.classic.Shutdown._
object Shutdown {
sealed trait Command
case object Init extends Command
case object GracefulShutdown extends Command
def props(): Props = Props(classOf[Shutdown])
}
class Shutdown extends Actor with ActorLogging {
override def receive: Actor.Receive = {
case Init => spawnChildren()
case GracefulShutdown =>
log.error("I'm not yet initialized!")
}
def initialized: Actor.Receive = {
case Init => log.error("I'm already initialized!")
case GracefulShutdown =>
log.info("Pre Cleaning Up Tasks")
// After this actor is stopped, all children will also be stopped
context.stop(self)
}
override def postStop(): Unit = {
log.info("Post Clean Up Tasks")
}
def spawnChildren(): Unit = ()
}
Akka Classic offers a handy method to wait for this GracefulShutdown:
import akka.pattern.gracefulStop
import scala.concurrent.Await
try {
val stopped: Future[Boolean] =
gracefulStop(actorRef, 5 seconds, Shutdown.GracefulShutdown)
Await.result(stopped, 6 seconds)
// the actor has been stopped
} catch {
// the actor wasn't stopped within 5 seconds
case e: akka.pattern.AskTimeoutException =>
}
Alternatively, use PoisonPill
:
import akka.actor.typed.{Behavior, PostStop}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}
import scala.annotation.nowarn
object Shutdown {
sealed trait Command
case object Init extends Command
case object GracefulShutdown extends Command
def apply(): Behavior[Command] =
Behaviors.receive {
case (context, Init) =>
spawnChildren(context)
initialized()
case (context, _) =>
context.log.error("I'm not initialized!")
Behaviors.same
}
def initialized(): Behavior[Command] =
Behaviors
.receive[Command] {
case (context, GracefulShutdown) =>
context.log.info("Pre Cleaning Up Tasks")
// After this actor is stopped, all children will also be stopped
Behaviors.stopped
case (context, Init) =>
context.log.error("I'm already initialized")
Behaviors.same
}
.receiveSignal {
case (context, PostStop) =>
context.log.info("Post Cleaning Up Tasks")
Behaviors.same
}
@nowarn
def spawnChildren(context: ActorContext[Command]): Unit = ()
}
You can get notified when any other actor terminates (voluntarily or via Exception
).
Useful for cases when an actor spins children to perform tasks, and wants to notify end users the status of given job.
class Parent extends Actor with ActorLogging {
var jobs: Map[ActorRef, ActorRef] = Map.empty
override def receive: Actor.Receive = {
case StartJob(code) =>
val child = context.actorOf(Child.props(code))
val _ = context.watch(child)
jobs = jobs + (child -> sender())
case Terminated(ref) =>
jobs(ref) ! "Done"
}
}
import akka.actor.typed.{ActorRef, Behavior, ChildFailed, Terminated}
import akka.actor.typed.scaladsl.Behaviors
object Watching {
sealed trait JobState
case object Finished extends JobState
case object Failed extends JobState
sealed trait Command
case class StartJob(code: String, replyTo: ActorRef[JobState]) extends Command
var jobs: Map[ActorRef[Nothing], ActorRef[JobState]] = Map.empty
def apply(): Behavior[Command] =
Behaviors
.receive[Command] {
case (context, StartJob(code, replyTo)) =>
val child = context.spawnAnonymous[Nothing](job(code))
context.watch(child)
jobs = jobs + (child -> replyTo)
Behaviors.same
}
.receiveSignal {
case (_, Terminated(ref)) =>
jobs(ref).tell(Finished)
jobs = jobs - ref
Behaviors.same
case (_, ChildFailed(ref, _)) =>
jobs(ref).tell(Failed)
jobs = jobs - ref
Behaviors.same
}
}
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
object WatchingAlt {
sealed trait JobState
case object Finished extends JobState
sealed trait Command
case class StartJob(code: String, replyTo: ActorRef[JobState]) extends Command
case class FinishedJob(replyTo: ActorRef[JobState]) extends Command
def apply(): Behavior[Command] =
Behaviors
.receive[Command] {
case (context, StartJob(code, replyTo)) =>
val child = context.spawnAnonymous[Nothing](job(code))
context.watchWith(child, FinishedJob(replyTo))
Behaviors.same
case (_, FinishedJob(replyTo)) =>
replyTo ! Finished
Behaviors.same
}
}
watchWith
is present on both Akka Typed and Akka Classic. Ideal case for it, is when children
failures can be ignored.