Future and Actor are two different paradigms to express concurrent computations. Both of them are perfectly valid abstractions. However one must be careful when it comes to mixing them up.
The scenario I have in mind is when an Actor must use a service that returns a Future in its API. The service might look something similar to this:
import scala.concurrent.Future import scala.util.Random object RandomService { import scala.concurrent.ExecutionContext.Implicits.global /** Generates a random Int **/ def performComputation: Future[Int] = Future { val number = Random.nextInt(5) // artificially wait to simulate a longer computation Thread.sleep(number.toLong + 1) number } }
This service defines a single method that returns a Future. It doesn’t matter what the service does – it can fetch data from a database, trigger a remote call or perform an intensive computation, … – the thing that matters is that it returns a Future.
Also notice that in this example the service imports the scala global execution context. It means that this service will use a different execution context than the one in the actor. This is usually the case (e.g. a service that defines its own thread pool like database connections) and this is a good thing too because we don’t want to block the actor’s thread to perform a long computation.
Now that we have a service let’s create an Actor that calls this service every time it receives a request message.
import akka.Actor import scala.concurrent.Future object FutureActor { /** the protocol messages to interact with this actor **/ case object Request // client sends a Request message ... case object Response // ... and expects a Response back // message used to get the actor's internal state during tests case object GetState /** the state of the actor: The number of requests received and processed so far **/ case class State(received: Int = 0, processed: Int = 0) } class FutureActor extends Actor { import FutureActor._ import context.dispatcher // internal actor state (number of received and processed requests) var state = State() def receive = { case Request => // update state state = state.copy(received = state.received + 1) // call service RandomService.performComputation.foreach { _ => // processing done -> update state state = state.copy(processed = state.processed + 1) // reply to sender sender() ! Response } case GetState => sender() ! state } }
Nothing too complex here. We have an actor that keeps track of how many requests it as received and processed so far. This is not really important what the actor keeps track of. The important thing is that the actor has a state and it updates its state when the future completes successfully.
Now that we have an actor let’s write a test for it:
import akka.actor.ActorSystem import akka.pattern.ask import akka.testkit.{TestActorRef, TestKit} import akka.util.Timeout import org.scalatest.concurrent.{PatienceConfiguration, ScalaFutures} import org.scalatest.{BeforeAndAfterAll, Matchers, WorkSpecLike} import scala.concurrent.Future import scala.concurrent.duration._ class FutureActorSpec extends TestKit(ActorSystem("FutureActorSpec")) with WordSpecLike with Matchers with ScalaFutures with BeforeAndAfterAll { import system.dispatcher import FutureActor._ implicit val timeout = Timeout(10.seconds) implicit val patience = PatienceConfiguration.timeout(10.seconds) override def afterAll() = { system.shutdownActorSystem(system) super.afterAll() } "FutureActor" should { "process all received messages" in { val ref = TestActorRef[FutureActor] Future.traverse(1 to 100)(_ => ref ? Request).futureValue // check all requests have been processed val actorState = (ref ? GetState).futureValue actorState.received shouldBe 100 actorState.processed shouldBe 100 } } }
Our test looks pretty simple: Send 100 requests to the actor and wait for the responses.
Once we got all the responses, ask the actor for its state and check that both the received and processed counters equal 100.
So now let’s run it and … What? Test failed because of a timeout. How come? We’ve explicitly increased the default timeouts to 10s which is more than enough to process 100 requests.
mmmhh …our test looks ok but keeps failing. It might be something in our actor implementation … we seem to never receive some responses back. Let’s see what can happen…
Our random service computation might fail in which case the “foreach” is never called. Correct! That would be a good practice to add a recover block and make sure we deal with failures. However our random service implementation is simple enough and there were no such failures during the test.
The problem is far more subtle and is because the Future and Actor paradigms have been mixed together without careful consideration.
Remember! An actor processes one message a time. Therefore we can assume a “single-threaded” environment when we process the messages. This is what we’ve done here. The actor update its state by mutating the “state” property and this is perfectly fine (as long as the actor assumption holds).
Now let’s think how the actor interacts with the RandomService. It uses a Future and perform some actions when the Future completes. But wait! How does Future concurrency work? … yes it uses different assumptions. We have now turn our nice “single-threaded view” into a multi-threaded one. When the Future completes, our actor is no longer processing the message that triggered the service but has jump to another message.
That means that the call to sender() when the future completes no longer points to the sender who sent the original message but to the sender of the current message being processed by the actor. And that is why our future timed out (behind the scene the ask pattern creates an actor to wait for the reply and in our case we were sending some replies to the wrong actors, therefore some “ask” actors never got their reply and our future timed out).
We also have similar concurrency issues when updating the actor state as we are just mutating a var (which is fine in a “single-threaded” env but is subject to nasty bugs in a multi-threaded world).
In fact this case is so frequent that akka provides us with a pattern to address this: The pipe pattern.
The pipe pattern allows to send a message to an actor when a Future completes. We can use the pipe pattern to send the response to ourself and update the actor’s state safely under the “single-threaded” view of the actor paradigm.
Our actor code becomes:
import akka.Actor import akka.pattern.pipe import scala.concurrent.Future object FutureActor { /** the protocol messages to interact with this actor **/ case object Request // client sends a Request message ... case object Response // ... and expects a Response back case object GetState // message used to get the actor's internal state during tests // a new message that the actor sends to himself when the future completes case class ServiceResponse(number: Int) /** the state of the actor: The number of requests received and processed so far **/ case class State(received: Int = 0, processed: Int = 0) } class FutureActor extends Actor { import FutureActor._ import context.dispatcher // internal actor state (number of received and processed requests) var state = State() def receive = { case Request => // update state state = state.copy(received = state.received + 1) RandomService.performComputation // call service .map(ServiceResponse.apply) // turn response into ServiceResponse .pipeTo(self)(sender()) // sends it back to itself case ServiceResponse(_) => // processing done -> safely update state state = state.copy(processed = state.processed + 1) // and notify sender that request has been processed sender() ! Response case GetState => sender() ! state } }
What we’ve changed now is that when the Future completes the response is turned into a ServiceResponse message.
This map operation is safe (no side-effect) and then the message is sent to the actor itself. (We also use the pipe pattern to preserve the sender reference).
We can then add a new case to the receive block to update the actor state and notify the correct sender.
Now if we run our test … it works just fine. But keep in mind that the remark about service failure still holds and we have to handle the failure ourselves. The pipe pattern does no magic here.