As we’ve seen in this comparison with Apache Cassandra, DynamoDB may be a valuable choice for storing data. It’s fully maintained by AWS you just have to configure your instance and you can start using it straight away.
Being one of the Amazon web services DynamoDB offers a JSON/HTTP interface and Amazon provides a set of client for a variety of language. Unfortunately Scala is not one of them but there is a Java client available. Sadly it suffers from big performance issues due to poor design choices.
So what are the alternatives? This is what we’re going to see in this post: How can we get the most out of the official driver and are there better alternatives both in terms of performance or integration with the Scala ecosystem.
The AWS Java client
As the time of writing of this post, the official AWS Java Client for DynamoDB is based on the Apache HTTP client and uses blocking calls. This is very unfortunate as making an HTTP call takes a very long time (with respect to CPU cycles). It might be useful for testing purposes but it’s definitely not something I’d like to use in production code.
There’re actually 2 clients:
AmazonDynamoDB
is the blocking client I just mentionedAmazonDynamoDBAsync
is an “async” client that doesn’t block the calling thread
AmazonDynamoDBAsync
is clearly a better option but it’s still not truly async. It still relies on Java Future (which blocks as soon as you try to access the value inside the future) and uses a thread pool (a FixedThreadPool
to be exact) to perform the blocking call when sending the HTTP request. That’s clearly a limiting factor in a high-throughput application.
In fact it might happen that your application cannot reach your DynamoDB provisioned throughput just because there is no more threads available to perform more requests. So what can you do?
Of course, use bigger thread pools. This is easily configurable but you’re going to reach a point where there is too much contention on the thread pool’s job queue. You might then prefer to use 2 (or more) thread pools and dispatch requests to these thread pools. But clearly using 1 thread per request doesn’t scale well.
Amazon is very aware of this limitation and is currently working on a new version of it’s Java client. This new client is based on Netty – one of the fastest HTTP library on the JVM – and offers a truly async interface based on Java 8 CompletableFuture
.
This new client is certainly an option worth considering. However at the time of writing of this post it is still under “developper preview” and might change according to the feedback from the community.
The API
The client is just a wrapper around HTTP calls. It is suppose to make the developer’s life easier but I cannot say that I particularly appreciate the Java interface. It relies heavily on the “builder” pattern in order to make a “fluent” interface.
import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.model.AttributeValue; import com.amazonaws.services.dynamodbv2.model.GetItemRequest; import scala.collection.JavaConverters._ val client = AmazonDynamoDBClientBuilder.standard().build() val key = Map( "Artist" -> new AttributeValue().withS("No One You Know"), "SongTitle" -> new AttributeValue().withS("Call Me Today") ).asJava val request = new GetItemRequest().withTableName("Music").withKey(key) val response = client.getItem(request)
Basically the client uses a Map[String, AttributeValue]
to represents the DynamoDB specific JSON syntax:
{ "Artist": {"S": "No One You Know"}, "SongTitle": {"S": "Call Me Today"} }
Alternatively there is a higher-level API that uses POJO-annotations and a DynamoDBMapper
to generate the JSON corresponding to your data model.
Given all the JSON libraries available in Scala there is certainly a better option where we would define our data model as an ADT (a set of sealed traits and case classes) and have a library handle the JSON transformation for us: This is Scanamo.
Scanamo
Scanamo is a Scala library for DynamoDB developed by the Guardian.
It provides 2 components:
DynamoFormat
provides a mapping from your ADT case classes to the AWSAttributeValue
ScanamoOps
The DynamoDB operations modelled as a Free monad
Of course it glues everything together in a simple syntax to query DynamoDB:
// The data model case class Farm(animals: List[String]) case class Farmer(name: String, age: Long, farm: Farm) // DynamoDB ops val table = Table[Farmer]("farmer") val ops = for { _ <- farmTable.putAll(Set( Farmer("Boggis", 43L, Farm(List("chicken"))), Farmer("Bunce", 52L, Farm(List("goose"))), Farmer("Bean", 55L, Farm(List("turkey"))) )) bunce <- farmTable.get('name -> "Bunce") } yield bunce // Execute the ops Await.result(ScanamoAsync.exec(client)(ops), 5.seconds)
DynamoFormat
DynamoFormat is pretty interesting in itself. DynamoFormat
is actually a type-class with methods to transform to/from AttributeValue
(which in turn is transformed into a JSON string to be sent over the wire).
@typeclass trait DynamoFormat[T] { def read(av: AttributeValue): Either[DynamoReadError, T] def write(t: T): AttributeValue def default: Option[T] = None }
The interesting bit is that it relies on Shapeless to automatically derive type class instances for your ADT.
It’s also easy to define additional formats for specific classes (e.g. for java Instant
)
import java.time.Instant implicit val dateFormat: DynamoFormat[Instant] = coercedXmap[Instant, String, IllegalArgumentException](Instant.parse)(_.toString)
Like most of the Scala-JSON libraries it relies on implicit resolutions to find the right format.
This is a pretty nice library all by itself and I wish the Guardian had released it on its own (much like play-json is released outside of Play) but more on that later.
Scanamo Operations
ScanamoOpsA
represents all the DynamoDB operations along with their return type.
sealed trait ScanamoOpsA[A] extends Product with Serializable final case class Put(req: ScanamoPutRequest) extends ScanamoOpsA[PutItemResult] final case class Get(req: GetItemRequest) extends ScanamoOpsA[GetItemResult] final case class Delete(req: ScanamoDeleteRequest) extends ScanamoOpsA[DeleteItemResult] final case class Update(req: ScanamoUpdateRequest) extends ScanamoOpsA[UpdateItemResult] // ...
These operations are then lifted into a Free monad so that they can be easily combined.
object ScanamoOps { import cats.free.Free.liftF def put(req: ScanamoPutRequest): ScanamoOps[PutItemResult] = liftF[ScanamoOpsA, PutItemResult](Put(req)) def get(req: GetItemRequest): ScanamoOps[GetItemResult] = liftF[ScanamoOpsA, GetItemResult](Get(req)) // ... }
where ScanamoOps
is just a Free monad
type ScanamoOps[A] = Free[ScanamoOpsA, A]
Having a (Free) monad we can combine them in for-comprehensions. This is what was done in the example above. We performed a put followed by a get request:
val ops = for { _ <- farmTable.putAll(Set( Farmer("Boggis", 43L, Farm(List("chicken"))), Farmer("Bunce", 52L, Farm(List("goose"))), Farmer("Bean", 55L, Farm(List("turkey"))) )) bunce <- farmTable.get('name -> "Bunce") } yield bunce
Note that nothing is executed here. It’s just a description (or blueprint) of the operations to be executed on DynamoDB. The farmTable.get
will return an Option[Either[DynamoReadError, Farmer]]
but in order to get it we need an Interpreter that can turn a Free[ScanamoOpsA, Option[Either[DynamoReadError, Farmer]]]
into a Future[Option[Either[DynamoReadError, Farmer]]]
(or directly into an Option[Either[DynamoReadError, Farmer]]
if we’re using the blocking sync client).
That’s a bit of a convoluted return type but every type is there for a reason:
- The
Either[DynamoReadError, Farmer]
is here in case the returned JSON can’t be deserialised into aFarmer
- The surrounding
Option
indicates the possibility that there is no matching record found in DynamoDB.
Scanamo provides 2 interpreters (both of them rely on the AWS Java client):
ScanamoInterpreters.id
that relies on the AWS sync clientScanamoInterpreters.future
that relies on the AWS async client
To execute the above operations we need to foldMap
over them using one of these 2 interpreters.
val interpreter = ScanamoInterpreters.future(asyncClient) ops.foldMap(interpreter)
Alternatively Scanamo provides some nicer syntax to do just that:
val res = Scanamo.exec(client)(ops) // or val futureRes = ScanamoAsync.exec(asyncClient)(ops)
Note that DynamoDB imposes some limitations in the size of batch requests but Scanamo provides a nice way to get around them with:
def putAll[T](tableName: String)(items: Set[T])(implicit f: DynamoFormat[T]): ScanamoOps[List[BatchWriteItemResult]] = items.grouped(batchSize).toList.traverseU(batch => ScanamoOps.batchWrite( new BatchWriteItemRequest().withRequestItems(Map(tableName -> batch.toList.map(i => new WriteRequest().withPutRequest(new PutRequest().withItem(f.write(i).getM)) ).asJava).asJava) ) ) // deleteAll and getAll work similarly
Here it creates as many BatchWriteItemRequest
as needed to handle all the items passed in.
To conclude Scanamo is a very handy library to work with DynamoDB as it provides tools that integrates nicely with Scala. The use of the Free monad allows to compose multiple requests together but it’s well hidden behind simple APIs so that it’s possible to use it without being aware of such implementation details.
The interpreters provided by Scanamo are based on the AWS Java client and as such suffers the same performance issues.
The good news is that providing a new interpreter based on a different library is easy to do and above all runs all of your operation without any code changes. So the question here: Is there any other DynamoDB client on the JVM?
Alpakka
It turns out there is another DynamoDB client on the JVM in the form of an Akka-stream connector part of the Alpakka project.
Alpakka aims at providing a all sorts of connectors for Akka-streams (Kafka, DynamoDB, SQS, Kinesis, Cassandra, AMQP, JMS, elasticsearch, …).
If you look at the connector for DynamoDB you’ll find a rather simple implementation and at its heart is this method:
override protected val connection: AwsConnect = if (settings.port == 443) Http().cachedHostConnectionPoolHttps[AwsRequestMetadata](settings.host)(materializer) else Http().cachedHostConnectionPool[AwsRequestMetadata](settings.host, settings.port)(materializer)
What it means is that it doesn’t use the AWS Java client to send request but Akka-Http (a truly non-blocking HTTP client based on Akka-stream!).
If we look closer to the Alpakka connector we see that it interfaces with the request/response entities provided by the AWS Java client and transform them into Akka HTTP requests/responses.
It makes the client not very pleasant to use (Java builders, no-automatic conversion from the data model ADT, …) but having a non-blocking client is a huge win!
The better of both worlds
Knowing this, you’ve probably guessed where this is going by now: Can we use the Alpakka client in Scanamo?
Well, it turns out that yes, we can. If you think about it, the Scanamo interpreters are using the AWS Java client so they must turn the Scanamo operations into AmazonWebServiceRequest
s at some point.
And there is an object that do just that (although it is package private):
private[ops] object JavaRequests { def scan(req: ScanamoScanRequest): ScanRequest = ??? def query(req: ScanamoQueryRequest): QueryRequest = ??? def put(req: ScanamoPutRequest): PutItemRequest = ??? def delete(req: ScanamoDeleteRequest): DeleteItemRequest = ??? def update(req: ScanamoUpdateRequest): UpdateItemRequest = ??? }
Then the job of the interpreter is simple: turn the Scanamo opereration into an AWS request (using JavaRequests
) and use the AWS client to execute this request.
def id(client: AmazonDynamoDB) = new (ScanamoOpsA ~> Id) { def apply[A](op: ScanamoOpsA[A]): Id[A] = op match { case Put(req) => client.putItem(JavaRequests.put(req)) case Get(req) => client.getItem(req) case Delete(req) => client.deleteItem(JavaRequests.delete(req)) // ... } }
The idea to implement a new interpreter is to replace the client with an Alpakka connector:
def future(client: DynamoClient)(implicit executor: ExecutionContext): ScanamoOpsA ~> Future = new (ScanamoOpsA ~> Future) { import akka.stream.alpakka.dynamodb.scaladsl.DynamoImplicits._ override def apply[A](ops: ScanamoOpsA[A]): Future[A] = ops match { case Put(req) => client.single(JavaRequests.put(req)) case Get(req) => client.single(req) case Delete(req) => client.single(JavaRequests.delete(req)) // ... } }
And there you have it! A truly non-blocking interpreter for Scanamo. The best of both world: an easy and nice syntax with a performant client!
Conclusion
When it comes to accessing DynamoDB there is not a lot of option on the JVM (to my knowledge). However it’s still a web service so whatever client can do. The second aspect is how easy it ease to build a request. Scanamo really has a point to ease building the requests.
It would have make sense to split Scanamo into 2 artifacts:
- One library to build AWS requests including
DynamoFormats
and a way to build the requests outside of the Free monad - One library based on the other one that uses the Free monad to compose Dynamo operations and execute them
As always you can find (and try) these ideas in this Github repo.