We are familiar with this initialization pattern:
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
object DropBeforeInit {
sealed trait Status
case class Done(primes: List[Int]) extends Status
case object Processing extends Status
case object Discarded extends Status
sealed trait Command
case object Initialize extends Command
case class Primes(numberOfPrimes: Int, replyTo: ActorRef[Status]) extends Command
def primeStream(s: LazyList[Int]): LazyList[Int] =
LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))
def apply(): Behavior[Command] =
Behaviors.receive {
case (context, Initialize) =>
context.log.info("Initializing - doing some costly things")
initialized()
case (context, Primes(numberOfPrimes, replyTo)) =>
context.log.error("Request to calculate {} primes was discarded", numberOfPrimes)
replyTo.tell(Discarded)
Behaviors.same
}
def initialized(): Behavior[Command] =
Behaviors.receiveMessage {
case Primes(numberOfPrimes, replyTo) =>
replyTo.tell(Processing)
val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
replyTo.tell(Done(nPrimes.toList))
Behaviors.same
case Initialize => throw new IllegalStateException("Already initialized")
}
}
What if we would like to keep all those discarded messages? In the end, it's not the client's fault that we weren't initialized yet!
import akka.actor.{Actor, ActorLogging, Props, Stash}
import io.github.jlprat.akka.lnl.stash.classic.StashBeforeInit._
object StashBeforeInit {
sealed trait Status
case class Done(primes: List[Int]) extends Status
case object Processing extends Status
sealed trait Command
case object Initialize extends Command
case class Primes(numberOfPrimes: Int) extends Command
def props(): Props = Props(classOf[StashBeforeInit])
private def primeStream(s: LazyList[Int]): LazyList[Int] =
LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))
}
class StashBeforeInit extends Actor with Stash with ActorLogging {
override def receive: Actor.Receive = {
case Initialize =>
log.info("Initializing - doing some costly things")
unstashAll()
context.become(initialized)
case Primes(numberOfPrimes) =>
log.info("Stashing request to calculate {} number of primes", numberOfPrimes)
stash()
}
def initialized: Actor.Receive = {
case Initialize =>
throw new IllegalStateException("Already initialized")
case Primes(numberOfPrimes) =>
sender() ! Processing
// We could store this already calculated primes in a field,
// but for the sake of performing something costly, we calculate it every time
val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
sender() ! Done(nPrimes.toList)
}
}
The size of the Stash
is determined by the stash-capacity
setting of the mailbox’s
configuration.
Be aware that the Stash
trait should be mixed in before any other trait that overrides the
preStart
hook.
Also, Stash
only works with dequed (Double Ended Queue) mailboxes.
import akka.actor.typed.{ActorRef, Behavior}
import akka.actor.typed.scaladsl.Behaviors
object StashBeforeInit {
sealed trait Status
case class Done(primes: List[Int]) extends Status
case object Processing extends Status
sealed trait Command
case object Initialize extends Command
case class Primes(numberOfPrimes: Int, replyTo: ActorRef[Status]) extends Command
def primeStream(s: LazyList[Int]): LazyList[Int] =
LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))
def apply(): Behavior[Command] =
Behaviors.withStash(25) { stashBuffer =>
Behaviors.receive {
case (context, Initialize) =>
context.log.info("Initializing - doing some costly things")
stashBuffer.unstashAll(initialized())
case (context, msg @ Primes(numberOfPrimes, _)) =>
context.log.info("Stashing request to calculate {} number of primes", numberOfPrimes)
stashBuffer.stash(msg)
Behaviors.same
}
}
def initialized(): Behavior[Command] =
Behaviors.receiveMessage {
case Primes(numberOfPrimes, replyTo) =>
replyTo.tell(Processing)
// We could store this already calculated primes in a field,
// but for the sake of performing something costly, we calculate it every time
val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
replyTo.tell(Done(nPrimes.toList))
Behaviors.same
case Initialize => throw new IllegalStateException("Already initialized")
}
}
When unstashing, messages are prepended to the mailbox. This means that after calling unstash
,
the given actor will process first the unstashed messages and then, any other messages in the mailbox.
What might happen if we stashed 300 messages, and each message takes 200ms. to process?
This means that for 1 full minute we won't be able to process any new incoming message, leaving the clients wondering if the app is responding at all.
What can we do?
If you use Akka Classic you are unlucky. The only thing one could do is to configure the stash size to be small, so the time unstashing remains manageable.
If you use Akka Typed though, you are lucky!
Instead of unstashing all messages, one can unstash in smaller chunks, so incoming messages can be processed between batches.
import akka.actor.typed.{ActorRef, ActorSystem, Behavior}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.AskPattern._
import akka.util.Timeout
import scala.concurrent.duration._
import scala.io.StdIn
object ChunkedUnstash {
case class Done(primes: List[Int])
sealed trait Command
case object Initialize extends Command
case class Primes(numberOfPrimes: Int, replyTo: ActorRef[Done]) extends Command
private case object ResumeUnstash extends Command
private case class Stashed(msg: Command) extends Command
def primeStream(s: LazyList[Int]): LazyList[Int] =
LazyList.cons(s.head, primeStream(s.tail filter { _ % s.head != 0 }))
def apply(): Behavior[Command] =
Behaviors.withStash(300) { stashBuffer =>
def uninitialized(): Behavior[Command] =
Behaviors.receive {
case (context, Initialize) =>
context.log.info("Initializing - doing some costly things")
if (stashBuffer.size > 0) {
context.self.tell(ResumeUnstash)
stashBuffer.unstash(initialized(), 5, Stashed)
} else {
initialized()
}
case (context, msg @ Primes(numberOfPrimes, _)) =>
context.log.info("Stashing request to calculate {} primes", numberOfPrimes)
stashBuffer.stash(msg)
Behaviors.same
case _ => Behaviors.unhandled
}
def initialized(): Behavior[Command] =
Behaviors.receive {
case (context, Stashed(Primes(numberOfPrimes, replyTo))) =>
context.log.info("Processing a previously stashed message")
val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
replyTo.tell(Done(nPrimes.toList))
Behaviors.same
case (context, Primes(numberOfPrimes, replyTo)) =>
context.log.info("Processing a fresh new message")
val nPrimes = primeStream(LazyList.from(2)).take(numberOfPrimes)
replyTo.tell(Done(nPrimes.toList))
Behaviors.same
case (context, ResumeUnstash) =>
if (stashBuffer.size > 0) {
context.log.info("Finished one batch of unstashing")
context.self.tell(ResumeUnstash)
stashBuffer.unstash(initialized(), 5, Stashed)
} else {
context.log.info("Finished Unstashing")
Behaviors.same
}
case (context, Stashed(msg)) =>
context.log.error("Wrong message in stash {}", msg)
throw new IllegalStateException("Wrong message in stash")
case (_, Initialize) => throw new IllegalStateException("Already initialized")
}
uninitialized()
}
}
Go and run ChunkedUnstash
.
Logs should show that messages received while unstashing are not processed last.
Use case: we have some costly writes. While writes are not correctly persisted, we should not process some potential conflicting operations.
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.actor.typed.scaladsl.Behaviors
import akka.Done
import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.annotation.nowarn
object SimpleKeyValueStore {
sealed trait PutResponse
case class Stored(key: String) extends PutResponse
case class Failed(key: String) extends PutResponse
sealed trait GetResponse
case class Retrieved(value: String) extends GetResponse
case class Missing(key: String) extends GetResponse
sealed trait Command
case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
case class Get(key: String, replyTo: ActorRef[GetResponse]) extends Command
private case class KeyValueStored(key: String, value: String, replyTo: ActorRef[PutResponse])
extends Command
private case class KeyValueFailed(key: String, value: String, replyTo: ActorRef[PutResponse])
extends Command
def apply(storage: Map[String, String] = Map.empty): Behavior[Command] =
Behaviors.setup { context =>
implicit val ec: ExecutionContext =
context.system.dispatchers.lookup(DispatcherSelector.blocking())
Behaviors.receiveMessage {
case Put(key, value, replyTo) =>
val saved = saveToDatabase(key, value)
context.pipeToSelf(saved) {
case Failure(exception) =>
context.log.error("Error Saving to DB", exception)
KeyValueFailed(key, value, replyTo)
case Success(_) =>
KeyValueStored(key, value, replyTo)
}
Behaviors.same
case Get(key, replyTo) =>
replyTo.tell(
storage.get(key).map(Retrieved).getOrElse(Missing(key))
)
Behaviors.same
case KeyValueFailed(key, _, replyTo) =>
replyTo.tell(Failed(key))
Behaviors.same
case KeyValueStored(key, value, replyTo) =>
replyTo.tell(Stored(key))
apply(storage + (key -> value))
}
}
@nowarn
def saveToDatabase(key: String, value: String)(implicit ec: ExecutionContext): Future[Done] =
Future {...}
}
What happens when a client Saves a key-value pair and immediately after, attempts to retrieve such key?
Additionally, there is a race while updating the same key.
import akka.actor.typed.{ActorRef, Behavior, DispatcherSelector}
import akka.actor.typed.scaladsl.Behaviors
import akka.Done
import scala.concurrent.{blocking, ExecutionContext, Future}
import scala.util.{Failure, Success}
import scala.annotation.nowarn
object TransactionalKeyValueStore {
sealed trait PutResponse
case class Stored(key: String) extends PutResponse
case class Failed(key: String) extends PutResponse
sealed trait GetResponse
case class Retrieved(value: String) extends GetResponse
case class Missing(key: String) extends GetResponse
sealed trait Command
case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
case class Get(key: String, replyTo: ActorRef[GetResponse]) extends Command
private case class KeyValueStored(key: String, value: String, replyTo: ActorRef[PutResponse])
extends Command
private case class KeyValueFailed(key: String, value: String, replyTo: ActorRef[PutResponse])
extends Command
def apply(storage: Map[String, String] = Map.empty): Behavior[Command] =
Behaviors.setup { context =>
implicit val ec: ExecutionContext =
context.system.dispatchers.lookup(DispatcherSelector.blocking())
Behaviors.receiveMessage {
case Put(key, value, replyTo) =>
val saved = saveToDatabase(key, value)
context.pipeToSelf(saved) {
case Failure(exception) =>
context.log.error("Error Saving to DB", exception)
KeyValueFailed(key, value, replyTo)
case Success(_) =>
KeyValueStored(key, value, replyTo)
}
saving(storage)
case Get(key, replyTo) =>
replyTo.tell(
storage.get(key).map(Retrieved).getOrElse(Missing(key))
)
Behaviors.same
case msg =>
context.log.error("Received wrong message {}", msg)
Behaviors.unhandled
}
}
def saving(storage: Map[String, String]): Behavior[Command] =
Behaviors.withStash(30) { buffer =>
Behaviors.receiveMessage {
case KeyValueFailed(key, _, replyTo) =>
replyTo.tell(Failed(key))
buffer.unstashAll(apply())
case KeyValueStored(key, value, replyTo) =>
replyTo.tell(Stored(key))
buffer.unstashAll(apply(storage + (key -> value)))
case msg =>
buffer.stash(msg)
Behaviors.same
}
}
@nowarn
def saveToDatabase(key: String, value: String)(implicit ec: ExecutionContext): Future[Done] =
Future {...}
}
What it is not: It's not a DB framework!
Akka Persistence enables an Actor to store it's state for a later retrieval, but it shouldn't be seen as a ORM-like (JPA, Hibernate, etc.) library.
Akka Persistance provides mechanisms to persist events generated by an Actor, so it can in case of failure, recreate its internal state at a later point.
It's a great match for applications using Event Sourcing.
We need to understand the difference between Commands and Events.
Commands are the received messages of an Actor. They are not yet validated nor persisted.
After successful validation, a Command is converted to an Event, which represents the actions to perform to the Actor's inner State.
Only after successfully persisting and Event, the actor is allowed to update their state.
import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.scaladsl.EventSourcedBehavior
import akka.persistence.typed.PersistenceId
import akka.persistence.typed.scaladsl.Effect
object PersistentKeyValueStore {
sealed trait PutResponse extends CborSerializable
case class Stored(key: String) extends PutResponse
case class Failed(key: String, reason: String) extends PutResponse
sealed trait GetResponse extends CborSerializable
case class Retrieved(value: String) extends GetResponse
case class Missing(key: String) extends GetResponse
sealed trait Command extends CborSerializable
case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
case class Get(key: String, replyTo: ActorRef[GetResponse]) extends Command
sealed trait Event extends CborSerializable
case class KeyValuePut(key: String, value: String) extends Event
final case class State(storage: Map[String, String]) extends CborSerializable
object State {
val empty = State(Map.empty)
}
def apply(id: String): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("KeyValueStore", id),
emptyState = State.empty,
commandHandler = handleCommand,
eventHandler = handleEvent
)
def handleCommand(state: State, command: Command): Effect[Event, State] =
command match {
case Put(key, value, replyTo) =>
//Let's crate some arbitrary rules for keys and values
if (key.length() > 100 || value.length > 500) {
// Validation didn't hold, we reply and don't persist this Event
replyTo.tell(Failed(key, "Either key or value exceed maximum size"))
Effect.none
} else {
// Validation succeeded, we transform the command to an Event, and we persist it.
Effect.persist(KeyValuePut(key, value))
.thenReply(replyTo)(_ => Stored(key))
}
case Get(key, replyTo) =>
replyTo.tell(state.storage.get(key).map(Retrieved).getOrElse(Missing(key)))
Effect.none
}
def handleEvent(state: State, event: Event): State =
event match {
case KeyValuePut(key, value) => State(state.storage + (key -> value))
}
}
A Persistent Actor will automatically stash any incoming Command while persisting any Event.
An Actor is automatically restoring their state on startup, if there are any event for the given persistenceID.
What happens if we leave our previous Key-Value-Store running for a 1 day? How will the storage look like?
What if updating a previously introduced key is the most common action we have?
A Snapshots consist of the serialized representation of an Actor internal State.
If an actor tends to accumulate long Event Logs, it might be worth to generate State Snapthots periodically.
When recovering, a Persistent Actor will first attempt to restore the latest Snapshot, and then, replay all subsequent events.
You can determine the right periodicity of Snapshot generation according to your business needs and specificities.
import akka.actor.typed.{ActorRef, Behavior}
import akka.persistence.typed.scaladsl.{Effect, EventSourcedBehavior, RetentionCriteria}
import akka.persistence.typed.PersistenceId
object PersistentKeyValueStoreWithSnapshots {
sealed trait PutResponse extends CborSerializable
case class Stored(key: String) extends PutResponse
case class Failed(key: String, reason: String) extends PutResponse
sealed trait GetResponse extends CborSerializable
case class Retrieved(value: String) extends GetResponse
case class Missing(key: String) extends GetResponse
sealed trait Command extends CborSerializable
case class Put(key: String, value: String, replyTo: ActorRef[PutResponse]) extends Command
case class Get(key: String, replyTo: ActorRef[GetResponse]) extends Command
sealed trait Event extends CborSerializable
case class KeyValuePut(key: String, value: String) extends Event
final case class State(storage: Map[String, String]) extends CborSerializable
object State {
val empty = State(Map.empty)
}
def apply(id: String): Behavior[Command] =
EventSourcedBehavior[Command, Event, State](
persistenceId = PersistenceId("KeyValueStore", id),
emptyState = State.empty,
commandHandler = handleCommand,
eventHandler = handleEvent
)
//.snapshotWhen((state, event, sequenceNumber) => sequenceNumber % 100 == 0)
.withRetention(RetentionCriteria.snapshotEvery(numberOfEvents = 100,
keepNSnapshots = 2))
def handleCommand(state: State, command: Command): Effect[Event, State] =
command match {
case Put(key, value, replyTo) =>
//Let's crate some arbitrary rules for keys and values
if (key.length() > 100 || value.length > 500) {
// Validation didn't hold, we reply and don't persist this Event
replyTo.tell(Failed(key, "Either key or value exceed maximum size"))
Effect.none
} else {
// Validation succeeded, we transform the command to an Event, and we persist it.
Effect.persist(KeyValuePut(key, value)).thenReply(replyTo)(_ => Stored(key))
}
case Get(key, replyTo) =>
replyTo.tell(state.storage.get(key).map(Retrieved).getOrElse(Missing(key)))
Effect.none
}
def handleEvent(state: State, event: Event): State =
event match {
case KeyValuePut(key, value) => State(state.storage + (key -> value))
}
}
Pick the right serialization method and do not use Java Serialization, it's slow and buggy.
Test config for in-memory storage
akka {
actor {
serialization-bindings {
"io.github.jlprat.akka.lnl.persistence.typed.CborSerializable" = jackson-cbor
}
}
persistence {
# inmem only for tests
journal.plugin = "akka.persistence.journal.inmem"
snapshot-store.plugin = "akka.persistence.snapshot-store.local"
snapshot-store.local.dir = "target/snapshot"
}
}
Akka Persistence offers mechanism to migrate different schema versions of your Events. In order to do so, one
needs to implement their own version of an EventAdapter
, which will convert events from version
to another