Akka Logo

Supervising Akka

Josep Prat  GitHub Logo - 2020/10/28

Link to the companion Repo

QR code for the repository

Questions:

  • What is the Actor Lifecycle?
  • How to supervise Actors?
  • How to initialize tasks in Actors?
  • How to deal with Restarts in Actors?
  • How to gracefully finish tasks in Actors?
  • How to watch my children?

What is the Actor Lifecycle?

An actor is a stateful resource that can be started, stopped and restarted.
Akka Typed and Akka Classic actors show some differences in their default lifecycle.

What is the Actor Lifecycle?

Actors offer lifecycle hooks to execute custom tasks on:

  • Startup
  • Shutdown
  • Restart

What is the Actor Lifecycle?

Classic Actors rely on methods that can be overridden: preStart, preRestart, postRestart and postStop.

What is the Actor Lifecycle?

Akka Typed uses a combination of signals (PreRestart and PostStop) and code inside Behaviors.setup.

What is the Actor Lifecycle?

If the actor encounters some failure and throws an Exception the actor will:

  • Restart in Akka Classic
  • Stop in Akka Typed

How to supervise Actors?

An important distinction:

  • Error: Is an expected wrong behavior, e.g., a validation error
  • Failure: Is an unexpected event that prevents the actor from continuing their work, e.g., broken DB connection

How to supervise Actors?

Supervision is the act of deciding how to react when Failures occur.

How to supervise Actors?

There 3 different main approaches when supervising:

  • Resume, keeping the current state
  • Restart, discarding the current state
  • Stop

How to supervise Actors?

Default Supervision strategy differs between Akka Classic and Typed:

  • Akka Classic: The actor is restarted
  • Akka Typed: The actor is stopped

How to supervise Actors?

Supervision strategy is traditionally defined within the parent actor. However, in Akka Typed, it could also be defined withing the same actor.

How to supervise Actors? - Akka Classic

import akka.actor.{Actor, ActorRef, AllForOneStrategy, Props, SupervisorStrategy}
import scala.concurrent.duration._

import io.github.jlprat.akka.lnl.supervision.classic.SupervisionExample.Child._
import io.github.jlprat.akka.lnl.supervision.classic.SupervisionExample.Parent.Init

object SupervisionExample {

  object Child {

    sealed trait Command
    case class Save(tag: String, value: Int) extends Command
    case class Retrieve(key: Key)            extends Command

    case class Key(id: String)
    case class Stored(product: Product)
    case class Product(hash: String, tag: String, value: Int)

    def props(): Props = Props(classOf[Child])

  }

  class Child extends Actor {

    var storage: Map[String, Product] = Map.empty

    override def receive: Actor.Receive = {
      case Save(tag, value) =>
        val hash = s"${tag.hashCode()}"
        storage = storage.updated(hash, Product(hash, tag, value))
        sender() ! Key(hash)
      case Retrieve(key) if !storage.contains(key.id) =>
        throw new IllegalStateException("No such key!")
      case Retrieve(key) =>
        sender() ! Stored(storage(key.id))
    }
  }

  object Parent {
    sealed trait Command
    case object Init extends Command

    def props(): Props = Props(classOf[Parent])
  }

  class Parent extends Actor {

    override def receive: Actor.Receive = {
      case Init =>
        val child = context.actorOf(Child.props())
        context.become(initialized(child))
      case _ => throw new IllegalStateException("Not yet initialized")
    }

    def initialized(child: ActorRef): Actor.Receive = {
      case Init => throw new IllegalStateException("Already initialized")
      case msg  => child ! msg
    }

    override def supervisorStrategy: SupervisorStrategy =
      AllForOneStrategy(3, 1.minute) {
        case _: IllegalStateException => SupervisorStrategy.restart
        case _                        => SupervisorStrategy.escalate
      }
  }
}

How to supervise Actors? - Akka Typed

import akka.actor.typed.{ActorRef, Behavior, SupervisorStrategy}
import akka.actor.typed.scaladsl.Behaviors
import scala.concurrent.duration._

object SupervisionExample {

  sealed trait Command
  case object Init extends Command

  sealed trait NodeCommand extends Command
  case class Save(tag: String, value: Int, replyTo: ActorRef[Key])
    extends NodeCommand
  case class Retrieve(key: Key, replyTo: ActorRef[Stored]) extends NodeCommand

  case class Key(id: String)
  case class Stored(product: Product)
  case class Product(hash: String, tag: String, value: Int)

  def supervise(behavior: Behavior[NodeCommand]): Behavior[NodeCommand] =
    Behaviors
      .supervise(behavior)
      .onFailure[IllegalStateException](SupervisorStrategy
        .restart.withLimit(3, 1.minute))

  def apply(): Behavior[Command] =
    Behaviors.receive {
      case (context, Init) =>
        val child = context.spawnAnonymous(supervise(store()))
        initialized(child)
      case _ => throw new IllegalStateException("Not yet initialized")
    }

  def initialized(child: ActorRef[NodeCommand]): Behavior[Command] =
    Behaviors.receiveMessage {
      case Init => throw new IllegalStateException("Already initialized")
      case c @ Save(_, _, _) =>
        child.tell(c)
        Behaviors.same
      case c @ Retrieve(_, _) =>
        child.tell(c)
        Behaviors.same
    }

  def store(storage: Map[String, Product] = Map.empty): Behavior[NodeCommand] =
    Behaviors.receiveMessage {
      case Save(tag, value, replyTo) =>
        val hash = s"${tag.hashCode()}"
        replyTo ! Key(hash)
        store(storage.updated(hash, Product(hash, tag, value)))
      case Retrieve(key, _) if !storage.contains(key.id) =>
        throw new IllegalStateException("No such key!")
      case Retrieve(key, replyTo) =>
        replyTo ! Stored(storage(key.id))
        Behaviors.same
    }
}

How to supervise Actors?

There are other possibilities for supervision like the back-off ones where actors will wait for some specified time before restarting.

How to initialize tasks in Actors?

There are several possibilities:

  • In the constructor (if you use this style in Akka Typed)
  • Using preStart or Behaviors.setup hooks
  • Via initialization message

How to initialize tasks in Actors? - Akka Classic

class Initialization extends Actor {

  // Constructor initialization
  initDB()

  // Pre Start Hook initialization
  override def preStart(): Unit = {
    initDB()
  }

  override def receive: Actor.Receive = {
    case Init => 
      // Initialization via Message
      initDB()
    case _ => ??? // normal actor work
  }

  private def initDB(): Unit = ()
  
}
All possibilities are shown, do not use all of them for the same task.

How to initialize tasks in Actors? - Akka Typed

import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors

object Initialization {
  sealed trait Command
  case object Init extends Command

  def apply(): Behavior[Command] = Behaviors.setup{ _ =>

    // Setup hook
    initDB()

    Behaviors.receiveMessage{
      case Init => 
        // Initialize via message
        initDB()
        Behaviors.same
      case _ => //do normal things
        Behaviors.same
    }
  }

  private def initDB(): Unit = ()
}
All possibilities are shown, do not use all of them for the same task.

How to initialize tasks in Actors?

Constructor way:

  • Always ready for the actor
  • Also executed on restarts
  • Lacks customization

How to initialize tasks in Actors?

Hooks

  • Separation of concerns
  • Can be customized, i.e. execute on start but not on restart

How to initialize tasks in Actors?

Message

  • Extremely customizable
  • Useful for when more data/info is needed to initialize the task

How to deal with Restarts in Actors?

How can we execute different tasks on restart than the start ones?

How to deal with Restarts in Actors? - Akka Classic

class Restart extends Actor {

  override def preStart(): Unit = {
    // We create our children, but we want to keep them on restarts
    initChildren()
  }

  /**
    * default implementation:
    *
    * def preRestart(@unused reason: Throwable, @unused message: Option[Any]): Unit = {
    *   context.children.foreach { child =>
    *     context.unwatch(child)
    *     context.stop(child)
    *   }
    *   postStop()
    * }
    */
  override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
    // We don't stop our children anymore!
    postStop()
  }

  /**
    * Default implementation
    * def postRestart(@unused reason: Throwable): Unit = {
    *   preStart()
    * }
    */
  override def postRestart(reason: Throwable): Unit = {
    // we do not call `preStart` so no new children are created
  }
  ...
}

How to deal with Restarts in Actors? - Akka Typed

object Restart {
  sealed trait Command
  case object DoThings extends Command

  def apply(): Behavior[Command] =
    Behaviors.setup { context =>
      // This will be executed on Start
      createChildren(context)
      Behaviors.supervise {
        // This code will be executed on Start and Restart
        Behaviors.receiveMessage[Command] {
          case DoThings =>
            //Do things
            Behaviors.same
        }
      }.onFailure[Exception](SupervisorStrategy.restart.withStopChildren(false))
    }

  def createChildren(context: ActorContext[Command]): Unit = ()
}

How to gracefully finish tasks in Actors?

To successfully realize a graceful shutdown of your actor system, you should define to handle this situation in your top level actor a message.

How to gracefully finish tasks in Actors? - Akka Classic

import akka.actor.{Actor, ActorLogging, Props}
import io.github.jlprat.akka.lnl.supervision.classic.Shutdown._

object Shutdown {

  sealed trait Command
  case object Init             extends Command
  case object GracefulShutdown extends Command

  def props(): Props = Props(classOf[Shutdown])
}

class Shutdown extends Actor with ActorLogging {

  override def receive: Actor.Receive = {
    case Init => spawnChildren()
    case GracefulShutdown =>
      log.error("I'm not yet initialized!")
  }

  def initialized: Actor.Receive = {
    case Init => log.error("I'm already initialized!")
    case GracefulShutdown =>
      log.info("Pre Cleaning Up Tasks")
      // After this actor is stopped, all children will also be stopped
      context.stop(self)
  }

  override def postStop(): Unit = {
    log.info("Post Clean Up Tasks")
  }

  def spawnChildren(): Unit = ()
}

How to gracefully finish tasks in Actors? - Akka Classic

Akka Classic offers a handy method to wait for this GracefulShutdown:

import akka.pattern.gracefulStop
import scala.concurrent.Await

try {
  val stopped: Future[Boolean] = 
    gracefulStop(actorRef, 5 seconds, Shutdown.GracefulShutdown)
  Await.result(stopped, 6 seconds)
  // the actor has been stopped
} catch {
  // the actor wasn't stopped within 5 seconds
  case e: akka.pattern.AskTimeoutException =>
}

How to gracefully finish tasks in Actors? - Akka Classic

Alternatively, use PoisonPill:

  • No special pre clean up tasks
  • Message will be enqueued like any other message
  • Actor will shutdown itself when processing this message

How to gracefully finish tasks in Actors? - Akka Typed

import akka.actor.typed.{Behavior, PostStop}
import akka.actor.typed.scaladsl.{ActorContext, Behaviors}

import scala.annotation.nowarn

object Shutdown {
  sealed trait Command
  case object Init             extends Command
  case object GracefulShutdown extends Command

  def apply(): Behavior[Command] =
    Behaviors.receive {
      case (context, Init) =>
        spawnChildren(context)
        initialized()
      case (context, _) =>
        context.log.error("I'm not initialized!")
        Behaviors.same
    }

  def initialized(): Behavior[Command] =
    Behaviors
      .receive[Command] {
        case (context, GracefulShutdown) =>
          context.log.info("Pre Cleaning Up Tasks")
          // After this actor is stopped, all children will also be stopped
          Behaviors.stopped
        case (context, Init) =>
          context.log.error("I'm already initialized")
          Behaviors.same
      }
      .receiveSignal {
        case (context, PostStop) =>
          context.log.info("Post Cleaning Up Tasks")
          Behaviors.same
      }

  @nowarn
  def spawnChildren(context: ActorContext[Command]): Unit = ()
}

How to watch my children?

You can get notified when any other actor terminates (voluntarily or via Exception).

Useful for cases when an actor spins children to perform tasks, and wants to notify end users the status of given job.

How do I watch my children? - Akka Classic

class Parent extends Actor with ActorLogging {

  var jobs: Map[ActorRef, ActorRef] = Map.empty

  override def receive: Actor.Receive = {
    case StartJob(code) =>
      val child = context.actorOf(Child.props(code))
      val _     = context.watch(child)
      jobs = jobs + (child -> sender())
    case Terminated(ref) =>
      jobs(ref) ! "Done"
  }
}

How do I watch my children? - Akka Typed

import akka.actor.typed.{ActorRef, Behavior, ChildFailed, Terminated}
import akka.actor.typed.scaladsl.Behaviors

object Watching {

  sealed trait JobState
  case object Finished extends JobState
  case object Failed   extends JobState

  sealed trait Command
  case class StartJob(code: String, replyTo: ActorRef[JobState]) extends Command

  var jobs: Map[ActorRef[Nothing], ActorRef[JobState]] = Map.empty

  def apply(): Behavior[Command] =
    Behaviors
      .receive[Command] {
        case (context, StartJob(code, replyTo)) =>
          val child = context.spawnAnonymous[Nothing](job(code))
          context.watch(child)
          jobs = jobs + (child -> replyTo)
          Behaviors.same
      }
      .receiveSignal {
        case (_, Terminated(ref)) =>
          jobs(ref).tell(Finished)
          jobs = jobs - ref
          Behaviors.same
        case (_, ChildFailed(ref, _)) =>
          jobs(ref).tell(Failed)
          jobs = jobs - ref
          Behaviors.same
      }
}

How do I watch my children?

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors

object WatchingAlt {

  sealed trait JobState
  case object Finished extends JobState

  sealed trait Command
  case class StartJob(code: String, replyTo: ActorRef[JobState]) extends Command
  case class FinishedJob(replyTo: ActorRef[JobState]) extends Command

  def apply(): Behavior[Command] =
    Behaviors
      .receive[Command] {
        case (context, StartJob(code, replyTo)) =>
          val child = context.spawnAnonymous[Nothing](job(code))
          context.watchWith(child, FinishedJob(replyTo))
          Behaviors.same
        case (_, FinishedJob(replyTo)) =>
          replyTo ! Finished
          Behaviors.same
      }
}

How do I watch my children?

watchWith is present on both Akka Typed and Akka Classic. Ideal case for it, is when children failures can be ignored.

Thanks! And leave feedback!

Write Feedback!

Josep Prat  GitHub Logo - 2020/10/28