Akka Logo

Stashing and Persisting Akka

Josep Prat  GitHub Logo - 2020/11/20

Link to the companion Repo

QR code for the repository

Questions:

  • How to keep messages on initialization?
  • How to avoid unstash unresponsiveness?
  • How to keep messages while performing costly operations?
  • What is Akka Persistence about?
  • How to build an Event Sourced App?
  • Why should we save Snapshots?

How to keep messages on initialization?

We are familiar with this initialization pattern:

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors

object DropBeforeInit {

  sealed trait Status
  case class Done(primes: List[Int]) extends Status
  case object Processing             extends Status
  case object Discarded              extends Status

  sealed trait Command
  case object Initialize                                            extends Command
  case class Primes(numberOfPrimes: Int, replyTo: ActorRef[Status]) extends Command

  def primeStream(s: LazyList[Int]): LazyList[Int] =
    LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))

  def apply(): Behavior[Command] =
    Behaviors.receive {
      case (context, Initialize) =>
        context.log.info("Initializing - doing some costly things")
        initialized()
      case (context, Primes(numberOfPrimes, replyTo)) =>
        context.log.error("Request to calculate {} primes was discarded", numberOfPrimes)
        replyTo.tell(Discarded)
        Behaviors.same
    }

  def initialized(): Behavior[Command] =
    Behaviors.receiveMessage {
      case Primes(numberOfPrimes, replyTo) =>
        replyTo.tell(Processing)
        val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
        replyTo.tell(Done(nPrimes.toList))
        Behaviors.same
      case Initialize => throw new IllegalStateException("Already initialized")
    }
}

How to keep messages on initialization?

What if we would like to keep all those discarded messages? In the end, it's not the client's fault that we weren't initialized yet!

How to keep messages on initialization? - Akka Classic

import akka.actor.{Actor, ActorLogging, Props, Stash}
import io.github.jlprat.akka.lnl.stash.classic.StashBeforeInit._

object StashBeforeInit {

  sealed trait Status
  case class Done(primes: List[Int]) extends Status
  case object Processing             extends Status

  sealed trait Command
  case object Initialize                 extends Command
  case class Primes(numberOfPrimes: Int) extends Command

  def props(): Props = Props(classOf[StashBeforeInit])

  private def primeStream(s: LazyList[Int]): LazyList[Int] =
    LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))

}

class StashBeforeInit extends Actor with Stash with ActorLogging {

  override def receive: Actor.Receive = {
    case Initialize =>
      log.info("Initializing - doing some costly things")
      unstashAll()
      context.become(initialized)
    case Primes(numberOfPrimes) => 
      log.info("Stashing request to calculate {} number of primes", numberOfPrimes)
      stash()
  }

  def initialized: Actor.Receive = {
    case Initialize =>
      throw new IllegalStateException("Already initialized")
    case Primes(numberOfPrimes) =>
      sender() ! Processing
      // We could store this already calculated primes in a field,
      // but for the sake of performing something costly, we calculate it every time
      val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
      sender() ! Done(nPrimes.toList)
  }
}

How to keep messages on initialization? - Akka Classic

The size of the Stash is determined by the stash-capacity setting of the mailbox’s configuration.

How to keep messages on initialization? - Akka Classic

Be aware that the Stash trait should be mixed in before any other trait that overrides the preStart hook.

Also, Stash only works with dequed (Double Ended Queue) mailboxes.

How to keep messages on initialization? - Akka Typed

import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors

object StashBeforeInit {

  sealed trait Status
  case class Done(primes: List[Int]) extends Status
  case object Processing             extends Status

  sealed trait Command
  case object Initialize                                            extends Command
  case class Primes(numberOfPrimes: Int, replyTo: ActorRef[Status]) extends Command

  def primeStream(s: LazyList[Int]): LazyList[Int] =
    LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))

  def apply(): Behavior[Command] =
    Behaviors.withStash(25) { stashBuffer =>
      Behaviors.receive {
        case (context, Initialize) =>
          context.log.info("Initializing - doing some costly things")
          stashBuffer.unstashAll(initialized())
        case (context, msg @ Primes(numberOfPrimes, _)) =>
          context.log.info("Stashing request to calculate {} number of primes", numberOfPrimes)
          stashBuffer.stash(msg)
          Behaviors.same
      }
    }

  def initialized(): Behavior[Command] =
    Behaviors.receiveMessage {
      case Primes(numberOfPrimes, replyTo) =>
        replyTo.tell(Processing)
        // We could store this already calculated primes in a field,
        // but for the sake of performing something costly, we calculate it every time
        val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
        replyTo.tell(Done(nPrimes.toList))
        Behaviors.same
      case Initialize => throw new IllegalStateException("Already initialized")
    }
}

How to keep messages on initialization?

When unstashing, messages are prepended to the mailbox. This means that after calling unstash, the given actor will process first the unstashed messages and then, any other messages in the mailbox.

How to avoid unstash unresponsiveness?

What might happen if we stashed 300 messages, and each message takes 200ms. to process?

This means that for 1 full minute we won't be able to process any new incoming message, leaving the clients wondering if the app is responding at all.

What can we do?

How to avoid unstash unresponsiveness?

If you use Akka Classic you are unlucky. The only thing one could do is to configure the stash size to be small, so the time unstashing remains manageable.

How to avoid unstash unresponsiveness?

If you use Akka Typed though, you are lucky!

Instead of unstashing all messages, one can unstash in smaller chunks, so incoming messages can be processed between batches.

How to avoid unstash unresponsiveness?

import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout

import scala.concurrent.duration._
import scala.io.StdIn

object ChunkedUnstash {

  case class Done(primes: List[Int])

  sealed trait Command
  case object Initialize                                          extends Command
  case class Primes(numberOfPrimes: Int, replyTo: ActorRef[Done]) extends Command
  private case object ResumeUnstash                               extends Command
  private case class Stashed(msg: Command)                        extends Command

  def primeStream(s: LazyList[Int]): LazyList[Int] =
    LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))

  def apply(): Behavior[Command] =
    Behaviors.withStash(300) { stashBuffer =>
      def uninitialized(): Behavior[Command] =
        Behaviors.receive {
          case (context, Initialize) =>
            context.log.info("Initializing - doing some costly things")
            if (stashBuffer.size > 0) {
              context.self.tell(ResumeUnstash)
              stashBuffer.unstash(initialized(), 5, Stashed)
            } else {
              initialized()
            }
          case (context, msg @ Primes(numberOfPrimes, _)) =>
            context.log.info("Stashing request to calculate {} primes", numberOfPrimes)
            stashBuffer.stash(msg)
            Behaviors.same
          case _ => Behaviors.unhandled
        }

      def initialized(): Behavior[Command] =
        Behaviors.receive {
          case (context, Stashed(Primes(numberOfPrimes, replyTo))) =>
            context.log.info("Processing a previously stashed message")
            val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
            replyTo.tell(Done(nPrimes.toList))
            Behaviors.same
          case (context, Primes(numberOfPrimes, replyTo)) =>
            context.log.info("Processing a fresh new message")
            val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
            replyTo.tell(Done(nPrimes.toList))
            Behaviors.same
          case (context, ResumeUnstash) =>
            if (stashBuffer.size > 0) {
              context.log.info("Finished one batch of unstashing")
              context.self.tell(ResumeUnstash)
              stashBuffer.unstash(initialized(), 5, Stashed)
            } else {
              context.log.info("Finished Unstashing")
              Behaviors.same
            }
          case (context, Stashed(msg)) =>
            context.log.error("Wrong message in stash {}", msg)
            throw new IllegalStateException("Wrong message in stash")
          case (_, Initialize) => throw new IllegalStateException("Already initialized")
        }

      uninitialized()
    }
}

How to avoid unstash unresponsiveness?

Go and run ChunkedUnstash.

Logs should show that messages received while unstashing are not processed last.

How to keep messages while performing costly operation?

Use case: we have some costly writes. While writes are not correctly persisted, we should not process some potential conflicting operations.

How to keep messages while performing costly operation?

import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.actor.typed.scaladsl.Behaviors
import akka.Done

import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.annotation.nowarn

object SimpleKeyValueStore {

  sealed trait PutResponse
  case class Stored(key: String) extends PutResponse
  case class Failed(key: String) extends PutResponse

  sealed trait GetResponse
  case class Retrieved(value: String) extends GetResponse
  case class Missing(key: String)     extends GetResponse

  sealed trait Command
  case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
  case class Get(key: String, replyTo: ActorRef[GetResponse])                extends Command

  private case class KeyValueStored(key: String, value: String, replyTo: ActorRef[PutResponse])
      extends Command
  private case class KeyValueFailed(key: String, value: String, replyTo: ActorRef[PutResponse])
      extends Command

  def apply(storage: Map[String, String] = Map.empty): Behavior[Command] =
    Behaviors.setup { context =>
      implicit val ec: ExecutionContext =
        context.system.dispatchers.lookup(DispatcherSelector.blocking())
      Behaviors.receiveMessage {
        case Put(key, value, replyTo) =>
          val saved = saveToDatabase(key, value)
          context.pipeToSelf(saved) {
            case Failure(exception) =>
              context.log.error("Error Saving to DB", exception)
              KeyValueFailed(key, value, replyTo)
            case Success(_) =>
              KeyValueStored(key, value, replyTo)
          }
          Behaviors.same
        case Get(key, replyTo) =>
          replyTo.tell(
            storage.get(key).map(Retrieved).getOrElse(Missing(key))
          )
          Behaviors.same
        case KeyValueFailed(key, _, replyTo) =>
          replyTo.tell(Failed(key))
          Behaviors.same
        case KeyValueStored(key, value, replyTo) =>
          replyTo.tell(Stored(key))
          apply(storage + (key -> value))
      }
    }

  @nowarn
  def saveToDatabase(key: String, value: String)(implicit ec: ExecutionContext): Future[Done] =
    Future {...}
}

How to keep messages while performing costly operation?

What happens when a client Saves a key-value pair and immediately after, attempts to retrieve such key?

Additionally, there is a race while updating the same key.

How to keep messages while performing costly operation?

import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.actor.typed.scaladsl.Behaviors
import akka.Done

import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.annotation.nowarn

object TransactionalKeyValueStore {

  sealed trait PutResponse
  case class Stored(key: String) extends PutResponse
  case class Failed(key: String) extends PutResponse

  sealed trait GetResponse
  case class Retrieved(value: String) extends GetResponse
  case class Missing(key: String)     extends GetResponse

  sealed trait Command
  case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
  case class Get(key: String, replyTo: ActorRef[GetResponse])                extends Command

  private case class KeyValueStored(key: String, value: String, replyTo: ActorRef[PutResponse])
      extends Command
  private case class KeyValueFailed(key: String, value: String, replyTo: ActorRef[PutResponse])
      extends Command

  def apply(storage: Map[String, String] = Map.empty): Behavior[Command] =
    Behaviors.setup { context =>
      implicit val ec: ExecutionContext =
        context.system.dispatchers.lookup(DispatcherSelector.blocking())
      Behaviors.receiveMessage {
        case Put(key, value, replyTo) =>
          val saved = saveToDatabase(key, value)
          context.pipeToSelf(saved) {
            case Failure(exception) =>
              context.log.error("Error Saving to DB", exception)
              KeyValueFailed(key, value, replyTo)
            case Success(_) =>
              KeyValueStored(key, value, replyTo)
          }
          saving(storage)
        case Get(key, replyTo) =>
          replyTo.tell(
            storage.get(key).map(Retrieved).getOrElse(Missing(key))
          )
          Behaviors.same
        case msg =>
          context.log.error("Received wrong message {}", msg)
          Behaviors.unhandled
      }
    }

  def saving(storage: Map[String, String]): Behavior[Command] =
    Behaviors.withStash(30) { buffer =>
      Behaviors.receiveMessage {
        case KeyValueFailed(key, _, replyTo) =>
          replyTo.tell(Failed(key))
          buffer.unstashAll(apply())
        case KeyValueStored(key, value, replyTo) =>
          replyTo.tell(Stored(key))
          buffer.unstashAll(apply(storage + (key -> value)))
        case msg =>
          buffer.stash(msg)
          Behaviors.same
      }
    }

  @nowarn
  def saveToDatabase(key: String, value: String)(implicit ec: ExecutionContext): Future[Done] =
    Future {...}
}

What is Akka Persistence about?

What it is not: It's not a DB framework!

Akka Persistence enables an Actor to store it's state for a later retrieval, but it shouldn't be seen as a ORM-like (JPA, Hibernate, etc.) library.

What is Akka Persistence about?

Akka Persistance provides mechanisms to persist events generated by an Actor, so it can in case of failure, recreate its internal state at a later point.

It's a great match for applications using Event Sourcing.

How to build an Event Sourced App?

We need to understand the difference between Commands and Events.

Commands are the received messages of an Actor. They are not yet validated nor persisted.

After successful validation, a Command is converted to an Event, which represents the actions to perform to the Actor's inner State.

How to build an Event Sourced App?

Only after successfully persisting and Event, the actor is allowed to update their state.

How to build an Event Sourced App?

import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect

object PersistentKeyValueStore {

  sealed trait PutResponse                       extends CborSerializable
  case class Stored(key: String)                 extends PutResponse
  case class Failed(key: String, reason: String) extends PutResponse

  sealed trait GetResponse            extends CborSerializable
  case class Retrieved(value: String) extends GetResponse
  case class Missing(key: String)     extends GetResponse

  sealed trait Command                                                       extends CborSerializable
  case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
  case class Get(key: String, replyTo: ActorRef[GetResponse])                extends Command

  sealed trait Event                                 extends CborSerializable
  case class KeyValuePut(key: String, value: String) extends Event

  final case class State(storage: Map[String, String]) extends CborSerializable

  object State {
    val empty = State(Map.empty)
  }

  def apply(id: String): Behavior[Command] =
    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId("KeyValueStore", id),
      emptyState = State.empty,
      commandHandler = handleCommand,
      eventHandler = handleEvent
    )

  def handleCommand(state: State, command: Command): Effect[Event, State] =
    command match {
      case Put(key, value, replyTo) =>
        //Let's crate some arbitrary rules for keys and values
        if (key.length() > 100 || value.length > 500) {
          // Validation didn't hold, we reply and don't persist this Event
          replyTo.tell(Failed(key, "Either key or value exceed maximum size"))
          Effect.none
        } else {
          // Validation succeeded, we transform the command to an Event, and we persist it.
          Effect.persist(KeyValuePut(key, value))
            .thenReply(replyTo)(_ => Stored(key))
        }
      case Get(key, replyTo) =>
        replyTo.tell(state.storage.get(key).map(Retrieved).getOrElse(Missing(key)))
        Effect.none
    }

  def handleEvent(state: State, event: Event): State =
    event match {
      case KeyValuePut(key, value) => State(state.storage + (key -> value))
    }
}

How to build an Event Sourced App?

A Persistent Actor will automatically stash any incoming Command while persisting any Event.

An Actor is automatically restoring their state on startup, if there are any event for the given persistenceID.

Why should we save Snapshots?

What happens if we leave our previous Key-Value-Store running for a 1 day? How will the storage look like?

What if updating a previously introduced key is the most common action we have?

Why should we save Snapshots?

A Snapshots consist of the serialized representation of an Actor internal State.

If an actor tends to accumulate long Event Logs, it might be worth to generate State Snapthots periodically.

Why should we save Snapshots?

When recovering, a Persistent Actor will first attempt to restore the latest Snapshot, and then, replay all subsequent events.

Why should we save Snapshots?

You can determine the right periodicity of Snapshot generation according to your business needs and specificities.

Why should we save Snapshots?

import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, RetentionCriteria}
import akka.persistence.typed.PersistenceId

object PersistentKeyValueStoreWithSnapshots {

  sealed trait PutResponse                       extends CborSerializable
  case class Stored(key: String)                 extends PutResponse
  case class Failed(key: String, reason: String) extends PutResponse

  sealed trait GetResponse            extends CborSerializable
  case class Retrieved(value: String) extends GetResponse
  case class Missing(key: String)     extends GetResponse

  sealed trait Command                                                       extends CborSerializable
  case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
  case class Get(key: String, replyTo: ActorRef[GetResponse])                extends Command

  sealed trait Event                                 extends CborSerializable
  case class KeyValuePut(key: String, value: String) extends Event

  final case class State(storage: Map[String, String]) extends CborSerializable

  object State {
    val empty = State(Map.empty)
  }

  def apply(id: String): Behavior[Command] =
    EventSourcedBehavior[Command, Event, State](
      persistenceId = PersistenceId("KeyValueStore", id),
      emptyState = State.empty,
      commandHandler = handleCommand,
      eventHandler = handleEvent
    )
      //.snapshotWhen((state, event, sequenceNumber) => sequenceNumber % 100 == 0)
      .withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100,
        keepNSnapshots = 2))

  def handleCommand(state: State, command: Command): Effect[Event, State] =
    command match {
      case Put(key, value, replyTo) =>
        //Let's crate some arbitrary rules for keys and values
        if (key.length() > 100 || value.length > 500) {
          // Validation didn't hold, we reply and don't persist this Event
          replyTo.tell(Failed(key, "Either key or value exceed maximum size"))
          Effect.none
        } else {
          // Validation succeeded, we transform the command to an Event, and we persist it.
          Effect.persist(KeyValuePut(key, value)).thenReply(replyTo)(_ => Stored(key))
        }
      case Get(key, replyTo) =>
        replyTo.tell(state.storage.get(key).map(Retrieved).getOrElse(Missing(key)))
        Effect.none
    }

  def handleEvent(state: State, event: Event): State =
    event match {
      case KeyValuePut(key, value) => State(state.storage + (key -> value))
    }
}

Why should we save Snapshots?

Pick the right serialization method and do not use Java Serialization, it's slow and buggy.

Why should we save Snapshots?

Test config for in-memory storage

akka {
  actor {
    serialization-bindings {
      "io.github.jlprat.akka.lnl.persistence.typed.CborSerializable" = jackson-cbor
    }
  }

  persistence {
    # inmem only for tests
    journal.plugin = "akka.persistence.journal.inmem"
    snapshot-store.plugin = "akka.persistence.snapshot-store.local"
    snapshot-store.local.dir = "target/snapshot"
  }
}

Bonus: What if my events evolve?

Akka Persistence offers mechanism to migrate different schema versions of your Events. In order to do so, one needs to implement their own version of an EventAdapter, which will convert events from version to another

Thanks! And leave feedback!

Write Feedback!

Josep Prat  GitHub Logo - 2020/11/20