Akka actors fits nicely with DDD (Domain Driven Design) to design an application. E.g. It’s quite natural to model entities as individual actors who can be persisted, …
One of the key aspect in DDD is the notion of bounded context. A bounded context is simply a “self-content” component. It can interact with other components but it is coherent on its own. Each bounded context has its own domain model which belongs only to itself and should not leaked or be influenced by other bounded context.
Anti-corruption layers (aka translation layers or adapter layers) are used to enforce this principle. Basically their role is to translate the core domain objects into/from another domain that is used for communication or persistence.
In this blog post we’re going to try to follow the DDD principles to build a small (contrived) application using Akka and try to figure out the best way to build efficient anticorruption layers.
The context
We’re going to build a tiny application with 2 services.
- Auth: The authentication service in charge of creating sessions and verifying tokens
- Blog: The blogging service which stores a list of posts
Then we have the App
which simulates a scenario involving these 2 services.
Each of these services is persistent in order to recover from failure and doesn’t leak its domain model into the persistence layer or with the outside world (e.g. the App
client).
Implementation
Both services follow the same implementation model and relies on 3 different domains:
Model.scala
defines the core domain. This is the model used by the entities to implement their business logic.Persistence.scala
defines the model used to persist the events representing the actor state.Protocol.scala
defines the model (or the protocol) used by the other components to communicate with the service
The domain model
Given that this is just a proof of concept focusing on the anti-corruption layers, there is little to say about the core services.
They are implemented as Akka actors and are the only place where the core model is used to implement the business logic.
What they do is not really important but it’s worth noting that they call persist
with domain objects. They also receive domain objects directly inside the receiveCommand
method. Finally sending messages to the outside world requires a call to translate
to convert domain objects into protocol objects.
The persistence layer
The persistence layer is implemented in 2 steps:
- An Akka
EventAdapter
ensures the translation between the domain model and the persistence model - An Akka
Serializer
ensures the serialisation from the persistence model into the protobuf binary format
Fluent
The translation between the domain model and the persistence model is done using fluent. It makes the translation super-simple as we just have to specify which class gets transformed into which one.
import akka.persistence.journal.{EventAdapter, EventSeq} import cats.instances.option._ import fluent._ class AuthEventAdapter extends EventAdapter { override def manifest(event: Any): String = event.getClass.getName override def fromJournal(event: Any, manifest: String): EventSeq = event match { case e: Persistence.Authenticated => EventSeq(e.transformTo[Model.Authenticated]) case e: Persistence.Terminated => EventSeq(e.transformTo[Model.Invalidated]) } override def toJournal(event: Any): Any = event match { case e: Model.Authenticated => e.transformTo[Persistence.Authenticated] case e: Model.Invalidated => e.transformTo[Persistence.Terminated] } }
PBDirect
The serialisation into protobuf is assured by pbdirect. It allows to transform the persistent model case classes directly into protobuf without having to declare the same data structure into a .proto file.
import antikkor.example.auth.Persistence.{Authenticated, Terminated} import antikkor.serialization.PBAkkaSerializer import cats.instances.option._ import pbdirect._ class PBAuthSerializer extends PBAkkaSerializer { override def identifier: Int = 1340982252 override def serialize: PartialFunction[AnyRef, Array[Byte]] = { case event: Authenticated => event.toPB case event: Terminated => event.toPB } override def unserializeTo: PartialFunction[(Class[_], Array[Byte]), AnyRef] = { case (claSS, bytes) if claSS == classOf[Authenticated] => bytes.pbTo[Authenticated] case (claSS, bytes) if claSS == classOf[Terminated] => bytes.pbTo[Terminated] } }
As you can see PBAuthSerializer
doesn’t extend the Akka Serializer
directly. Instead it inherits from PBAkkaSerializer
which checks if the implementation has a serialisation/deserialization defined for a given class.
import akka.serialization.Serializer trait PBAkkaSerializer extends Serializer { def serialize: PartialFunction[AnyRef, Array[Byte]] def unserializeTo: PartialFunction[(Class[_], Array[Byte]), AnyRef] override def toBinary(o: AnyRef): Array[Byte] = if (serialize.isDefinedAt(o)) serialize.apply(o) else throw new IllegalArgumentException(s"Can't serialize ${o.getClass.getName}") override def includeManifest: Boolean = true override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = manifest match { case Some(claSS) if (unserializeTo.isDefinedAt((claSS, bytes))) => unserializeTo.apply(claSS, bytes) case Some(claSS) => throw new IllegalArgumentException(s"Don't know how to deserialize to ${claSS.getName} in ${this.getClass.getName}") case None => throw new IllegalArgumentException("Need a protobuf serializable class") } }
Note that if you use several serializers (typically one per service) you need to make sure their id
s are unique otherwise you might be asked to deserialise unexpected messages.
The translation layer
Similarly the translation layer is in charge of the translation between the domain model and the protocol model used to communicate with the outside world.
The implementation relies on fluent as well and look similar to the persistent’s layer EventAdapter
.
import akka.AdapterActor import cats.instances.option._ import fluent._ trait AuthAdapterActor extends AdapterActor { override protected def translate: PartialFunction[Any, Any] = { case message: Protocol.StartSession => message.transformTo[Model.Authenticate] case message: Protocol.Verify => message.transformTo[Model.Verify] case message: Protocol.EndSession => message.transformTo[Model.Invalidate] case message: Model.Authenticated => message.transformTo[Protocol.SessionStarted] case message: Model.Verified => message.transformTo[Protocol.SessionVerified] case message: Model.Invalidated => message.transformTo[Protocol.SessionEnded] case message: Model.InvalidUser => message.transformTo[Protocol.InvalidUser] case message: Model.InvalidToken => message.transformTo[Protocol.InvalidSession] } }
However this translation doesn’t fit into any of the Akka components.
One possible solution might be to implement a proxy actor in front of the business domain actor. The proxy actor would be in charge of translating messages into the appropriate model and forward it to the intended actor.
Here I tried another approach: define a wrapper using the aroundReceive
method to intercept the message and translate them before they are delivered to the actor’s receive
method.
This is exactly what the AdapterActor
does.
trait AdapterActor extends Actor { protected def translate: PartialFunction[Any, Any] override protected[akka] def aroundReceive(receive: Receive, msg: Any): Unit = { val adaptedReceive = if (translate.isDefinedAt(msg)) translate andThen receive else receive super.aroundReceive(adaptedReceive, msg) } }
It works just fine for the incoming messages. On the other hand sending messages is performed directly by the ActorRef
representing the destination actor. As it’s not possible to override the !
(tell
) method on the ActorRef
, there is no easy way to automate the translation for outgoing messages.
It’s possible to define another operator for “translate and tell”, but I found that actorRef ! translate(message)
is clear and explicit enough (compared to another “obscure” operator).
Conclusion
The bright side
The core business model is contained inside the core actor and doesn’t leak into the persistence layer or to the outside world.
The translation mechanic is hidden from the core and lies in specific classes (or trait) which makes the code well organised and easy to navigate. PBDirect and fluent libraries makes it ease to weave things together.
The pitfalls
Implicit resolution occurs at compile time which is good because it minimises the runtime penalty but it doesn’t play well with Akka where the configuration is resolved at compile time (by reading the akka.conf
file). Moreover the Akka API are defined as Any => Any
or Any => Unit
methods which doesn’t play well with implicit resolution which relies on type parameters.
I might be worth to have a look at Akka-typed to see how the “translation” behaviour can be implemented “around” another behaviour.
As usual you can find the code on github and let me know what you think or how do you implement your anti-corruption layers in Akka using the comments below.