The Cassandra Java Driver

Tweet about this on TwitterShare on LinkedInShare on FacebookShare on Google+Share on Reddit

Cassandra drivers are not just a dumb piece of software that sends CQL strings to a Cassandra node and waits for responses.

They are actually quite smart and are architectured in a way that should make your life easier while still attempting to get the most performance out of Cassandra.

In this post I am going to focus on the Java driver, have a quick look at its architecture and on some of the features it offers.

Architecture

The Cassandra Java drivers offers an asynchronous API. Note that it also provides a synchronous API but I am not going to cover this one because it’s built on top of the async API and I don’t want to block my application thread to interact with Cassandra.

Let’s go through a bottom-up approach of the components of the driver.

Connections

At the very bottom is the connection to a Cassandra node. The Cassandra protocol is fully asynchrone. It means we can send multiple requests over the same connection. We don’t have to wait for a single request to complete before sending the next one. Each request is identified by a correlation ID (which is actually called a “stream id”) and this ID is also set in the response so that the driver can match the response with the corresponding request.

The driver relies on Netty to perform async IO operations.

As soon as the request is sent to the connection session.executeAsync returns a Future, which is then completed using a promise when the corresponding response is received (or when a timeout or error occurs).

The on-going requests (aka “in-flight” requests) are stored in a queue. When the queue is full you can no longer sends query to Cassandra. executeAsync fails by returning a failed future. Before version 3.1 the calling thread was blocking, waiting for the connection to be available. Of course the queue size is configurable inside poolingOptions.

val poolingOptions = new PoolingOptions()
poolingOptions
  .setMaxRequestsPerConnection(HostDistance.LOCAL, 32768)
  .setMaxRequestsPerConnection(HostDistance.REMOTE, 2000)

val cluster = Cluster.builder()
    .withContactPoints("127.0.0.1")
    .withPoolingOptions(poolingOptions)
    .build()

The default are pretty low (1024 for local connections and 256 for remote connections). 256 can easily be used in a production application so I’d definitely recommend to tune these values to your need.

The connections are kept open using TCP keep-alive or sending an application heartbeat to maintain the connection open.

Connection pool

Connections belong to a connection pool. The driver maintains one connection pool per Cassandra node. The connection pools can be configured by means of the poolingOptions too.

poolingOptions
    .setConnectionsPerHost(HostDistance.LOCAL,  4, 10)
    .setConnectionsPerHost(HostDistance.REMOTE, 2, 4)

The main configuration is the pool size. The number of available connection can evolve between core and max according to the load. We can also set different settings for local or remote datacenters. When a connection has been idle for too long the connection is closed until the pool size reaches its core size.

Session

The connection pools belong to a Session.

The session is also the object the application uses to communicate with Cassandra.

This layer abstract all the connections management from the application.

val session = cluster.connect()

Session provides all the API to communicate with Cassandra like session.executeAsync which allows the application to send a request to Cassandra or session.getState which allows us to inspect and monitor the connected hosts or number of in-flights queries.

Cluster

Cluster is the top level abstraction. This is where we can configure everything like specifying the pooling options, the load balancing policy, the retry policy or the default consistency level.

val cluster = Cluster.builder()
    .withContactPoints("127.0.0.1")
    .withPoolingOptions(poolingOptions)
    .withLoadBalancingPolicy(new RoundRobinPolicy())
    .withQueryOptions(
      new QueryOptions().setConsistencyLevel(ConsistencyLevel.LOCAL_QUORUM)
    )
    .build()

Bootstrapping

When the driver first connects to one of the seed nodes, it establishes a control connection that it uses to discover the cluster topology. Basically it queries the system tables in Cassandra.

When bootstrapping the seed node is selected at random from the list of seed nodes in order to avoid using always the same node for the initial cluster discovery.

Load-balancing

The load balancing is in charge of establishing connections to entire Cassandra cluster (not just on one node) and to maintain connection pools to every host in the cluster.
It holds the logic to send certain requests to certain nodes. Which hosts to establish connection to and which hosts to send requests to is determined by the load balancing policy.

In fact for each request a query plan is computed. The query plan determines which hosts to send request to and in which order (depending on the speculative execution policy and the retry policy).

The load balancing also determines if host is local or remote (as different settings apply).

Finally it’s possible to code its own load balancing policy.

The driver extract the partition key from the request and using the correct hash algorithm determines the Cassandra nodes holding the partition.

The default policy is the DatacenterAwareLoadBalancingPolicy. It is both:

  • datacenter aware: determines which nodes belong to the local datacenter and which ones belong to a remote datacenter. The driver then sends requests only to the local datacenter and uses the remote datacenter as a fallback.
  • token aware: Finds the partition key for the request and hashes it with same algorithm than the cluster. Then it sends the request to a node responsible for the token (chosen randomly among the replica for that partition).

It’s possible to specify the local datacenter when using the DDCAwareRoundRobinPolicy:

Cluster cluster = Cluster.builder()
  .addContactPoint("127.0.0.1")
  .withLoadBalancingPolicy(
    DCAwareRoundRobinPolicy.builder()
      .withLocalDc("myLocalDC")
      .withUsedHostsPerRemoteDc(2)
      .allowRemoteDCsForLocalConsistencyLevel()
      .build()
    )
  )
  .build()

Fault tolerance

There are mainly 3 types of errors:

  • Invalid requests: The error flows back directly to the application as there is no way for the driver to know what to do with such requests
  • Server errors: The driver can try the next node according to the load-balancing policy
  • Network timeouts: The driver can retry the request if the request is marked as idempotent. By default requests are not considered idempotent so it’s always a good practice to mark requests as such when possible.

With idempotent requests the driver can send the request to a second node if there is no response from the first node within a certain delay. This is known as “speculative retry” and configured using a SpeculativeExecutionPolicy.

val cluster = Cluster.builder()
  .addContactPoint("127.0.0.1")
  .withSpeculativeExecutionPolicy(
    new ConstantSpeculativeExecutionPolicy(
      500, // delay before a new execution is launched
      2    // maximum number of executions
    )
  )
  .build()

Conclusion

The java driver is a versatile piece of machinery. It’s worth spending some time to understand its architecture and how to configure it properly (the max request per connection is specially important as I found out that the default were not very suitable – configuring the local datacenter is also important otherwise the driver may think it is connected to a remote datacenter).

Of course you can find out more information in the official documentation or directly in the source code.

  • Amazing article ! thanks Damien .