Categories
Computing

Synchrone computation in Akka actor

The problem

Last time we’ve seen how to deal with future inside an actor. However some times you just need to wait for a future to finish before processing the next message.

Naive solutions (don’t do it)

One obvious solution is to block the actor thread until the computation completes. As obvious as it is, it’s not a good solution as it may lead to deadlocks (Just imagine that all the actors are waiting on a future that doesn’t return because of some resource unavailability – all the actors are stuck and no more messages can be processed).

But if we don’t block, the actor will automatically jump to the next message before the computation completes. One obvious solution might be to resend messages to the actor itself until the computation completes such that no processing occurs until the future completes.

That certainly is better than blocking but we then lose the message ordering guarantee. Some earlier messages might be resend to the actor itself which enqueues them at the end of the mail box (potentially after some later messages that haven’t been resent yet). That may or may not be an issue depending on your use case.

However it’d be much better if there is a solution that allows to preserve message ordering. (It turns out there is a solution – it’s called stashing but I’ll come back to it later).

Digression : Akka persistence

I kept that in a corner and suddenly it clicked when I was looking at a persistent actor code.

Akka-persistence architecture overview

Akka persistence is a framework that allows a stateful actor to store its state into a persistent storage. The actor state is not stored directly (except when snapshotting) but instead every single event that lead to the current state is persisted. This technique is known as event-sourcing. It allows to reconstruct the actor state by replaying – in order – every persisted event. Persisting only the events allow for high-throuput using append-only storage engines.

For more information head to the akka persistence documentation.

If you extend PersistentActor (which is what you have to do to use akka persistence) you will end up with a receiveCommand method which looks like this:

def receiveCommand: Receive = {
   case command =>
      apply(command) match {
         case Success(event) =>
            persist(event) { event => 
               updateState(event)
               sender() ! event
         }
         case Failure(error) =>
            log.error("Failed to apply $command", error)
      }
}

This code snippet receives a command and applies it. (the apply method is not shown but it probably checks the command validity against the current state) and then call the persist method. The persist method is provided by the PersistentActor that we extended. It takes the event to persist and a callback that is invoked once the event has been persisted.

If you look at the documentation it says that the callback is invoked on the same thread as persist and that it guarantees that no other messages are processed.

Well, that’s exactly what we tried to do: make sure a computation  completes before processing any other messages.

The good thing is that Akka is open source and we can just check how persist is implemented.

Basically it changes the receive method to redirect all incoming messages to an internal stash until the event is persisted.

The solution

It turns out we can add a stash to any Actor by extending the Stash trait. Using the same technique we can make sure a computation completes before processing the next message while preserving message order and avoiding blocking.

import akka.actor.{ Actor, Stash }
import akka.pattern.pipe
import scala.util.{ Failure, Try }

object SyncActor {
  case class ComputationSucceeded[T](result: T)
  case class ComputationFailed(throwable: Throwable)
}

trait SyncActor extends Actor with Stash {
  import SyncActor._
  import context.dispatcher

  def onComplete[T](computation: Future[T])(handler: Try[T] => Unit): Unit = {
    computation
      .map(result => ComputationSucceeded(result))
      .recover { case throwable => ComputationFailed(throwable) }
      .pipeTo(self)

    waitForMessage {
      case ComputationSucceeded(result) =>
        handler(Try(result.asInstanceOf[T]))
      case ComputationFailed(throwable) =>
        handler(Failure(throwable))
    }
  }

  def waitForMessage(handler: Receive): Unit =
    context.become(waiting(handler))

  private def waiting(handler: Receive): Receive = {
    case message if handler.isDefinedAt(message) =>
      unstashAll()
      context.unbecome()
      handler(message)
    case _ => stash()
  }
}

Then using the SyncActor it’s possible to define our own PersistentActor (or anything for which you require sequential processing of messages).

import akka.actor.ActorLogging
import scala.util.{ Failure, Success }

class MyPersistentActor extends SyncActor with ActorLogging {
   def receive: Receive = {
      case command => 
         apply(command) match {
            case Success(event) =>
               onComplete(save(event)) {
                  case Success(_) => 
                     updateState(event)
                     sender() ! event
                  case Failure(error) =>
                     log.error("Failed to save $event")
               }
            case Failure(error) =>
               log.error("Failed to apply $command", error)
         }
   }
}

This example is quite simple and probably requires proper error handling. Also note that computation timeouts are not handled (it just assumed that the future completes).

This sequential actor processes one message at a time. Make sure the actor can keep up with the incoming message rate otherwise the stash will overflow.