Akka persistence

Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Share on Reddit

The actor model allows us to write complex distributed applications by containing the mutable state inside an actor boundary. However with Akka this state is not persistent. If the actor dies and then restarts all its state is lost.

To address this problem Akka provides the Akka Persistence framework. Akka Persistence is an effective way to persist an actor state but it’s integration needs to be well thought as it can greatly impact your application design. It fits nicely with the actor model and distributed system design – but is quite different from what a “more classic” application looks like.

In this post I am going to gloss over the different components of Akka Persistence and see how they influence the design choices. I’ll also try to cover some of the common pitfalls to avoid when building a distributed application with Akka Persistence.

Although Akka Persistence allows you to plug in various storage backends in this post I mainly discuss using the Cassandra backend.

Persisting data

Akka persistence is a way to persist data and it’s is not a way to access that data! The persisted data is serialised and saved as raw bits. Which means it’s not readable nor queryable.

It’s possible to query the metadata (persistence id, timestamps, sequence number, …) but not the data itself as it is stored in a blob. It we look at the akka_journal table definition in Cassandra we can see that the application data is stored as a blob into the event field.

CREATE TABLE warehouse.akka_journal (
    persistence_id text,
    partition_nr bigint,
    sequence_nr bigint,
    timestamp timeuuid,
    timebucket text,
    event blob,
    event_manifest text,
    message blob,
    ser_id int,
    ser_manifest text,
    tag1 text,
    tag2 text,
    tag3 text,
    used boolean static,
    writer_uuid text,
    PRIMARY KEY ((persistence_id, partition_nr), sequence_nr, timestamp, timebucket)
)

Of course, it’s still possible to store the data somewhere else but it’s a different problem. We’ll come back to that later but the key thing to understand is that these 2 concerns shouldn’t be mixed together.

It means that the actor that is persisting data with Akka Persistence shouldn’t try to write the same data into another table. This is just going to make the actor’s logic unnecessary complicated (and probably slow down the performances as well).

Imagine an actor that is persisting financial transactions. For sure we need to have this information stored elsewhere so that we can query it, audit it or even derive other information from it like balance account information, …. All these operations must not be added into the persisting actor. They don’t belong here.

In fact all these other tables that we need are just different views (or queries) of the persisted data and they form the ‘read’ side of the data while Akka Persistence provides the ‘write’ side of the data. We want to keep them separated from each other. The main reason is that the may likely have very different load profiles.

For instance, our system can persist thousands of transactions per second (write intensive) but the read side is accessed much less frequently (e.g. only when a customer logs in into its account or to generate a daily report, …).

Separating the reads and the writes accesses is known as the CQRS principle (Command-Query-Responsability-Segragation).

We’ll go back to the read-part later on. For now let’s focus on what of data needs to be persisted.

Event sourcing

Akka persistence provides a way to persist the actor state so surely enough the persisted data is just the actor state.

That’s one way to do it but not always the most efficient one.

Let’s imagine that we have an actor that receive all the transactions of a given customer. Its state is the sequence of all the transactions executed over a day. If it persists its state for every transaction it receives it will persist the same transactions many times. That is not efficient (but Akka persistence allows to it anyway as it doesn’t care of what is persisted).

A more common pattern is to persist the events. In this example it means that the actor is persisting every single transaction. It can then rebuild its state (the daily list of transactions) by replaying all the persisted transactions.

Now it’s the recovery that is inefficient as the actor needs to replay all the events just to build up the list of the last day of transactions.

In practice the 2 approaches are often combine together. The actor persist every single events and once in a while it persists its entire state (the snapshot). On recovery it needs to load the latest snapshot and all the events that occurred after this snapshot.

Persisting the events (and replaying them when needed) is known as event-sourcing.

In event sourcing there is a distinction between a command and an event. A command is a request to perform an action whereas the event is the outcome of that action.

Let’s consider an actor in charge of placing orders. It receives PlaceOrder command, checks if all the products are available and if so persists and emits an OrderPlaced event.

import akka.actor._
import akka.persistence._

class OrderProcessor extends Actor with PersistentActor {
  val persistenceId: String = "OrderProcessor"

  var turnover: Long = 0L

  def receiveCommand: Receive = {
    case command: OrderPlaced =>
      if (checkAvailability(command)) {
        val orderPlaced = OrderPlaced(command) 
        persist(orderPlaced) { event => 
           turnover = turnover + event.totalPrice
           sender() ! event
        }
      }
  }
  
  def receiveRecover: Receive = {
    case event: OrderPlaced =>
      turnover = turnover + event.totalPrice
  }
}

In event-sourcing only the events are persisted (and not the commands). It makes it easier to rebuild the system state as the events represents all the actions that already happened. The state can be updated directly from the events.

It the commands were persisted we’d need to re-apply the commands to recover the state (and if the command fails during the replay – but not when they ware received – we’d end up with an inconsistent state).

The key point here is to really be careful about which data you persist. Calling persist is quite easy and it’s possible to persist anything serialisable.

Contrast this situation with a schema database (SQL, Cassandra, …) where you need to define your table schema, write your query, go through any ORM-layer (Object-Relational-Mapping) before actually storing any data.

Don’t go this slippery road and think carefully about the persisted data. Ideally only the events should be persisted as it makes it easy to rebuild the actor state (along with the snapshot states). So always question why a data needs to be persisted.

Serialisation

Now that we know which data to persist, let’s focus on the serialisation mechanism. In fact the serialisation mechanism isn’t really part of Akka-Persistence but provided by Akka-serialisation. It’s the same mechanism that is used when a message needs to be sent over the network (e.g. to another JVM).

It’s not mandatory to use serialisation. For instance if your persistent storage supports JSON it’s possible to store a JSON representation of your data and skip the serialisation. (That being said, serialising to/from JSON is certainly not the most efficient solution).

The default serialisation mechanism is the classic Java serialisation. The only advantage is that it works out of the box (and it’s probably the only reason why it’s the default option). The main drawbacks are that is both slow and not efficient in terms of serialised data size, plus it doesn’t provide a way to deal with schema evolution. Even the official documentation makes it clear that you shouldn’t go on prod with this serialisation option.

While the performance both in terms of speed and data size does matter, for me, the key-factor is the way we deal with schema evolution.

Why is it so important? Simply because as the application changes so does the data model of the persisted data.
It means that we need to be able to read different versions of our data model when an actor recovers.

During recovery the actor has to process different event versions

As of today the most appropriate solution for serialisation seem to be Protobuf and Avro.

They are quite similar in terms of performances but the main difference is the way they deal with schema evolution. It’s definitely something you want to know before making a decision here. The subject is worth a blog post and Martin Klepmann already wrote a really good one on this.

The read side

If you read this far, it should be pretty clear how to persist data. However we also need to read or query this data which wasn’t possible so far.

For this matter Akka-persistence provides us with persistence-queries. At the beginning I didn’t quite get it. A persistence query … is it a query that runs forever ? … like a ‘SELECT’ statement that returns all the data and keeps waiting for new ones instead of terminating.

It turns out it’s pretty much what it is. Another way to look at it, is to see it as a stream of persisted events.

And in fact this is exactly what Akka-Persistence provides: an Akka-Stream of persisted events.

For a given persistent ID the event stream is totally order (no holes in the sequence number)

This is great we can now subscribe to the events of a given persistence id and write them in a dedicated table that’s going to support our queries.

The code looks something like this:

implicit val system: ActorSystem = ...
implicit val materialiser: Materializer = ...

CassandraReadJournal
  .instance
  .eventsByPersistenceId("OrderProcessor", 0L, Long.MaxValue)
  .runForeach(saveEvent)

def saveEvent(envelope: EventEnvelope): Unit = 
  envelope.event match {
    case orderPlaced: OrderPlaced =>
       // Store the orderPlaced in a table we can query ....
  }

And we can use Akka-Streams to do pretty much anything (log, write, aggregate, …).

However we get only the events of a single actor (because of the single persistent id). What if we need the messages from different actors?

There are actually 2 other type of streams available:

  • allPersistenceId: A stream of all the persistence ids used
  • eventsByTag: A stream of all the events tagged with a given value

The first one makes you aware of all the persistence IDs available in the system. It might be useful if you need to dynamically subscribe to new ‘eventsByPersistenceId’ streams.

The second one is much more interesting as it let’s you combine the events from multiple actor in a single stream. This is especially useful to perform aggregation.

As the events come from different actors there is no total order defined. We can’t tell wether event A-5 or B-13 will appear first in the stream.

However it requires that the events are tagged when persisted on the write side. It means the persistent actor should wrapped the event into a ‘Tagged’ envelope:

persist(Tagged(orderPlaced, "b2bOrder")) { event => 
  turnover = turnover + event.totalPrice
  sender() ! event
}

Alternatively it’s possible to put in place a ‘WriteEventAdapter’ that can wrap the event in the ‘Tagged’ envelope. It avoids to change to persistent actor code and plug-in more/different behaviour as needed.

Now back to the read side:

CassandraReadJournal
  .instance
  .eventsByTag("b2bOrder")
  .runForeach(saveEvent)

In case you’re wondering how the events flow from the write-side to the read-side it relies on the Akka-PubSub to notify the poller (‘PersistenceIdEventPoller’) of new available events. Additionally the poller also regularly queries the ‘akka-journal’ table (typically every 5 seconds).

The support of the tagged events depends on the persistent backend. The Cassandra persistence plugin supports it but only with 1 tag per event. (In case you need to apply multiple tags to a single event you can duplicate the message applying one tag to each copy).

Behind the scene the eventsByTag query is backed by a Cassandra materialised view. That’s great as it keeps the writes atomic: Any event written to the table ‘akka_journal’ is automatically replicated by Cassandra into the materialised view.

CREATE MATERIALIZED VIEW warehouse.eventsbytag1 AS
  SELECT 
    tag1, 
    timebucket, 
    timestamp,
    persistence_id, 
    partition_nr, 
    sequence_nr, 
    event, 
    event_manifest, 
    message, 
    ser_id, 
    ser_manifest, 
    writer_uuid
  FROM warehouse.akka_journal
  WHERE persistence_id IS NOT NULL 
  AND partition_nr IS NOT NULL 
  AND sequence_nr IS NOT NULL 
  AND tag1 IS NOT NULL 
  AND timestamp IS NOT NULL 
  AND timebucket IS NOT NULL
  PRIMARY KEY (
    (tag1, timebucket), timestamp, persistence_id, partition_nr, sequence_nr
  )

As we’ve seen the stream of events for a given persistence ID (i.e. the query eventsByPersistenceId) forms an ordered sequence of events indexed by the ‘sequenceNr’. It is a simple structure and provides some useful guarantees.

If events A occurred before B, A will be before B in the events sequence. It’s a useful property if there is a causal dependency between A and B and your application expects to see A before B.

The usage of a sequence number makes sure there is no hole in the sequence (it’s not possible to receive event 2 before event 1).

By using the eventsByTag query we lose such guarantees. The is no longer a unique sequence number for the stream (a sequence number is per persistence ID – not per tag).

Therefore there is no guarantee that if event A1 is received before event B2 it actually occurred in this order (B2 might occur first and then A1). That means there is no more causal consistency.

Finally the Cassandra materialised view does replicate any data from the base table to the view but it doesn’t guarantee they are written in the same order. (The final order in the view is defined by the clustering key but there is no guarantee on the order in which the data is written). It means the event stream may receive event B before event A even though A was persisted before B.

In most cases this is not a problem but if your application requires such consistency guarantees it’s good to know how the system works and what assumptions hold.

Conclusion

And that concludes this guidelines on Akka-Persistence. This is a very interesting framework to build distributed application. It is quite flexible as it allows you to choose your storage backend, … It also requires that you really think through the design of your application as it doesn’t really prevent you from doing the wrong thing.

Building a distributed system is a complex thing and you need to understand how the system your using works in order to know which assumptions you can make regarding the consistency guarantees.

  • Cal L F

    Hey Damien, you mention that you lose the causal consistency guarantee (monotonically increasing seqNr) when you use eventsByTag. I was using this piece of information to prevent duplicate data from affecting the integrity of the read-side. Using eventsByTag would still give you causal consistency if your IDs in your operational side (persistenceIds) mapped 1:1 with the IDs in the read side but as you mentioned if they did not (multiple persistenceIds map to a single column in the read side) then you would lose this guarantee.

    Do you know of any techniques, to detect incoming duplicate data when the persistence Ids does not map 1:1 with the read side columns?
    I was thinking that you could store each sequence number and persistence ID in a lookup database. You would first look up this information and see if its already there before you insert it into your read-side. In order to prevent this from taking up a large amount of space, you can set a time-to-live on the data inside the lookup database to stay for a fixed interval so that old data expires. Still, you would have to ensure that this is transactional (write to the read side table + write to the look up database).

    Thanks for your awesome post! I’m going through all your posts and I find them very useful. Please keep ’em coming!

    Cheers
    Cal

    • There is no duplicate in the EventsByTag stream, but the ordering of events is no longer guaranteed. What I understand in your situation is that you have multiple events updating the same record on the read side and you only want to keep the first one somehow.

      You can choose any solution to update your read side only once there is no way to guarantee that that the event that you pick is actually this first one that occurred.

      • Cal L F

        Hey Damien, thanks for getting back to me.

        When I say duplicates, I’m referring to the case here a persistence query goes down and comes back up and we aren’t sure whether we have processed an event from the journal or not so there may be a chance we are processing an event that we have already read because we didn’t get a chance to persist that we have processed the offset.

        Regarding order of events, if you had events A1, A2 and B1, B2 where A1 occurs before A2 and B1 occurs before B2 and A and B refer to different persistence IDs. Are you saying that it’s possible to get A2, A1, B2, B1 or any random interleaving like B2, A1, B1, A2 with eventsByTag. Because to my understanding, you still have causal consistency guarantee present (A1 must occur before A2) but you are now subject to interleaving between persistence IDs.

        Actually there are two situations I’m wondering about. One is where I use eventsByTag but the persistence IDs map 1:1 with a read side column so one persistence Id’s events affect a read side column. The other case is where multiple persistence Ids events affect a read side column. Assuming causal consistency with interleaving, we were thinking of the following approaches.

        In the case where a single persistence ID’s events map 1:1 with a read side column, you could use the technique of storing the sequence number to prevent duplicates from being considered.

        In the case of multiple persistence IDs. You could use a lookup database and keep track of the last seen seqNr for each Persistence ID. You need to ensure that you do a lookup and then in a transaction you update both the read side column and the lookup database with the new event sequence number.

        What do you think about the approaches?

        • The primary key for the eventsByTag materialised view is ((tagId, timebucket), timestamp, persistence_id, partition_nr, sequence_nr). It means events are sorted by timestamp so eventually there is causal consistency between A1, A2 and B1,B2 with interleaving – as you said.

          As I understand eventsByTag is a materialised view which means it is updated asynchronously by Cassandra and therefore it’s possible to observe A2 before A1 (in case A1 hasn’t been added to the materialised view yet so there is only A2 in the view) but then after the view has been updated A1 will appear before A2 (might be worth checking the C* documentation on this point).

          With that in mind relying on the highest persistence id doesn’t look like a robust solution (unless you can detect holes in the sequence which seems difficult as eventsByTag most likely doesn’t include all the events of a sequence).

          • Cal L F

            Ah that’s interesting. I didn’t know Materialized Views behaved like that. Thanks for the heads up, I looked at https://www.datastax.com/dev/blog/understanding-materialized-views for a follow up. It seems like the bigger the delay between the read and write side, the better the chance of preserving causal consistency.

            It seems like they have a fix: https://github.com/akka/akka-persistence-cassandra/issues/166
            It appears that when they detect this scenario, they perform a backtracking query to mitigate this. I don’t understand all the details. I’ll try to get back to you on this.

            Thanks for answering my questions!