Futures in actor

Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Share on Reddit

Future and Actor are two different paradigms to express concurrent computations. Both of them are perfectly valid abstractions. However one must be careful when it comes to mixing them up.

The scenario I have in mind is when an Actor must use a service that returns a Future in its API. The service might look something similar to this:

import scala.concurrent.Future
import scala.util.Random

object RandomService {
   import scala.concurrent.ExecutionContext.Implicits.global
   /** Generates a random Int **/
   def performComputation: Future[Int] = Future {
      val number = Random.nextInt(5)
      // artificially wait to simulate a longer computation
      Thread.sleep(number.toLong + 1)
      number
   }
}

This service defines a single method that returns a Future. It doesn’t matter what the service does – it can fetch data from a database, trigger a remote call or perform an intensive computation, … – the thing that matters is that it returns a Future.

Also notice that in this example the service imports the scala global execution context. It means that this service will use a different execution context than the one in the actor. This is usually the case (e.g. a service that defines its own thread pool like database connections) and this is a good thing too because we don’t want to block the actor’s thread to perform a long computation.

Now that we have a service let’s create an Actor that calls this service every time it receives a request message.

import akka.Actor
import scala.concurrent.Future

object FutureActor {
   /** the protocol messages to interact with this actor **/
   case object Request  // client sends a Request message ...
   case object Response // ... and expects a Response back

   // message used to get the actor's internal state during tests
   case object GetState 

   /** the state of the actor: The number of requests received and processed so far **/
   case class State(received: Int = 0, processed: Int = 0)
}

class FutureActor extends Actor {
   import FutureActor._
   import context.dispatcher
   
   // internal actor state (number of received and processed requests)
   var state = State()

   def receive = {
      case Request =>
         // update state
         state = state.copy(received = state.received + 1)
         // call service
         RandomService.performComputation.foreach { _ =>
            // processing done -> update state
            state = state.copy(processed = state.processed + 1)
            // reply to sender
            sender() ! Response
         }
      case GetState => sender() ! state
   }
}

Nothing too complex here. We have an actor that keeps track of how many requests it as received and processed so far. This is not really important what the actor keeps track of. The important thing is that the actor has a state and it updates its state when the future completes successfully.

Now that we have an actor let’s write a test for it:

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.testkit.{TestActorRef, TestKit}
import akka.util.Timeout
import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures}
import org.scalatest.{BeforeAndAfterAll, Matchers, WorkSpecLike}
import scala.concurrent.Future
import scala.concurrent.duration._

class FutureActorSpec 
      extends TestKit(ActorSystem("FutureActorSpec")) 
      with WordSpecLike 
      with Matchers 
      with ScalaFutures 
      with BeforeAndAfterAll {
   import system.dispatcher
   import FutureActor._
   implicit val timeout = Timeout(10.seconds)
   implicit val patience = PatienceConfiguration.timeout(10.seconds)

   override def afterAll() = {
      system.shutdownActorSystem(system)
      super.afterAll()
   }

   "FutureActor" should {
      "process all received messages" in {
         val ref = TestActorRef[FutureActor]
         Future.traverse(1 to 100)(_ => ref ? Request).futureValue
         // check all requests have been processed
         val actorState = (ref ? GetState).futureValue
         actorState.received shouldBe 100
         actorState.processed shouldBe 100
      }
   }
}

Our test looks pretty simple: Send 100 requests to the actor and wait for the responses.
Once we got all the responses, ask the actor for its state and check that both the received and processed counters equal 100.

So now let’s run it and … What? Test failed because of a timeout. How come? We’ve explicitly increased the default timeouts to 10s which is more than enough to process 100 requests.

mmmhh …our test looks ok but keeps failing. It might be something in our actor implementation … we seem to never receive some responses back. Let’s see what can happen…

Our random service computation might fail in which case the “foreach” is never called. Correct! That would be a good practice to add a recover block and make sure we deal with failures. However our random service implementation is simple enough and there were no such failures during the test.

The problem is far more subtle and is because the Future and Actor paradigms have been mixed together without careful consideration.

Remember! An actor processes one message a time. Therefore we can assume a “single-threaded” environment when we process the messages. This is what we’ve done here. The actor update its state by mutating the “state” property and this is perfectly fine (as long as the actor assumption holds).

Now let’s think how the actor interacts with the RandomService. It uses a Future and perform some actions when the Future completes. But wait! How does Future concurrency work? … yes it uses different assumptions. We have now turn our nice “single-threaded view” into a multi-threaded one. When the Future completes, our actor is no longer processing the message that triggered the service but has jump to another message.

That means that the call to sender() when the future completes no longer points to the sender who sent the original message but to the sender of the current message being processed by the actor. And that is why our future timed out (behind the scene the ask pattern creates an actor to wait for the reply and in our case we were sending some replies to the wrong actors, therefore some “ask” actors never got their reply and our future timed out).

We also have similar concurrency issues when updating the actor state as we are just mutating a var (which is fine in a “single-threaded” env but is subject to nasty bugs in a multi-threaded world).

In fact this case is so frequent that akka provides us with a pattern to address this: The pipe pattern.

The pipe pattern allows to send a message to an actor when a Future completes. We can use the pipe pattern to send the response to ourself and update the actor’s state safely under the “single-threaded” view of the actor paradigm.

Our actor code becomes:

import akka.Actor
import akka.pattern.pipe
import scala.concurrent.Future

object FutureActor {
   /** the protocol messages to interact with this actor **/
   case object Request  // client sends a Request message ...
   case object Response // ... and expects a Response back
   case object GetState // message used to get the actor's internal state during tests

   // a new message that the actor sends to himself when the future completes
   case class ServiceResponse(number: Int) 

   /** the state of the actor: The number of requests received and processed so far **/
   case class State(received: Int = 0, processed: Int = 0)
}

class FutureActor extends Actor {
   import FutureActor._
   import context.dispatcher
   
   // internal actor state (number of received and processed requests)
   var state = State()

   def receive = {
      case Request =>
         // update state
         state = state.copy(received = state.received + 1)
         RandomService.performComputation // call service
           .map(ServiceResponse.apply)    // turn response into ServiceResponse
           .pipeTo(self)(sender())        // sends it back to itself
      case ServiceResponse(_) =>
        // processing done -> safely update state
        state = state.copy(processed = state.processed + 1)
        // and notify sender that request has been processed 
        sender() ! Response
      case GetState => sender() ! state
   }
}

What we’ve changed now is that when the Future completes the response is turned into a ServiceResponse message.
This map operation is safe (no side-effect) and then the message is sent to the actor itself. (We also use the pipe pattern to preserve the sender reference).

We can then add a new case to the receive block to update the actor state and notify the correct sender.

Now if we run our test … it works just fine. But keep in mind that the remark about service failure still holds and we have to handle the failure ourselves. The pipe pattern does no magic here.