Back-pressure in Grpc-akkastream

Grpc-akkastream, the akka-stream implementation built on top of GRPC looks good on the surface but if you look under the hood there is one problem: it doesn’t provide any support for back-pressure.

Akka-streams (as any reactive-stream implementation) provides a way for back-pressure and GRPC (at least the Java version) also has an API that support back-pressure. Let’s see how we can wire everything together and provide a truly reactive akka-stream implementation for GRPC!

Before we start let’s briefly recap why we need back-pressure: Imagine you have one producer and one consumer. If the consumer consumes messages faster than the producer then everything’s fine, whatever the producer sends the consumers is able to cope with it. Now imagine that the consumer is slower than the producer. When a new message comes in the consumer might not be able to handle it straight away so it stores it in a buffer but as more and more messages arrive the buffer grows bigger and bigger until the consumer runs out of memory.

The only solution is for the consumer to tell the producer to slow down so that it can cope with the flow. That means there should be a communication channel going back from the consumer to the producer. This channel is aimed only at signalling to adjust the throughput of messages.

Akka-stream back pressure

All the building blocks of an Akka-stream: Source, Flow and Sink are actually graph stages.
A graph stage is made of ports and some logic to route/create/consume messages.
There are 2 types of ports:

  • Input port: a port that can receive messages
  • Output port: a port that can emit messages

Using these 2 types of ports it’s possible to build any graph stage:

  • A Source is a graph stage with only one output port (only emit messages)
  • A Sink is a graph stage with only one input port (only receives messages)
  • A Flow is a graph stage with one input port and one output port
  • More complex graph stage can have multiple ports and provide routing capabilities, …

Now to implement back pressure each port provide 2 operations: pull and pushed.

An output port can only push an element out if it has been pulled by downstream.
An input port has to pull the upstream to receive an element.

In order to know when a port has been pulled or pushed Akka-stream provide the associated handlers.

trait OutHandler {
  /**
   * Called when the output port is pulled
   */
  def onPull(): Unit

  /**
   * Called when the output port will no longer accept any new elements
   */
  def onDownstreamFinish(): Unit
}
trait InHandler {
  /**
   * Called when the input port has a new element available
   */
  def onPush(): Unit

  /**
   * Called when the input port is finished.
   */
  def onUpstreamFinish(): Unit

  /**
   * Called when the input port has failed.
   */
  def onUpstreamFailure(ex: Throwable): Unit
}

The principle is simple: Every time an output port is pulled (from the onPull callback) method it is allowed to push an element out with the push method.

On the input port, every time it is ready to receive an element it calls the pull method and it can then retrieve the element inside the onPushed callback (using the grab method).

There is a catch though. The push method can’t be called outside of the onPulled method because everything actually happens sequentially:

  1. The consumer calls the pull
  2. This triggers the onPulled callback on the producer
  3. The producer sends produces an element with the push method (the push method is called inside the onPulled callback).
  4. This triggers the onPushed callback on the consumer
  5. The consumer retrieves the element by calling the grap method.

That’s quite a problem if you’re not in control of when the element are generated. Let’s imagine that you want to create a Source of elements coming from HTTP requests, or elements published on a Kafka stream, … Obviously you can’t control when the elements are generated and you can’t guarantee that an element will be available when the downstream stages will trigger the onPulled callback on your Source.

Fortunately there is a solution to this problem: Async callbacks.

Whenever there is an element available you wrap the push call into an async callback.

// whenever an element becomes available
val callback = getAsyncCallback((value: T) => push(out, value))
callback.invoke(element)

Then when the downstream is going to pull the source for a new element the callback is going to kick in and make that element available to the downstream.

Now that we know how to implement back-pressure inside Akka-stream let’s see how it’s done in GRPC.

GRPC back-pressure

At first sight GRPC-java doesn’t seem to provide any back-pressure mechanism. The messages are passed using StreamObservers:

trait StreamObserver[T] {
  def onNext(element: T): Unit
  def onCompleted(): Unit
  def onError(t: Throwable): Unit
}

Everything seems to go forward. Whenever an element is available the onNext callback is invoked and there is no way for the consumer to slow down the producer.

If you scratch the surface a bit you’ll see that the StreamObserver instances actually implements the CallStreamObserver.

abstract class CallStreamObserver[T] 
extends StreamObserver[T] {
  def isReady(): Boolean
  def setOnReadyHandler(handler: Runnable): Unit
  def disableAutoInboundFlowControl(): Unit
  def request(count: Int): Unit
  def setMessageCompression(enable: Boolean): Unit  
}

There is a few interesting things to note here:

  • The request method enables a signalling flow from the consumer to producer supporting back pressure.
  • isReady can be checked before calling onNext in order to avoid flooding the consumer.
  • The onReadyHandler allows to register a callback that is invoked when the consumer becomes ready.
  • Finally we need to disable the automatic flow control in order to control the flow ourselves with the request method.

And that’s it this is all we need to implement a back-pressure in a GRPC service.

This is how it’s done on the client side:

And somewhat similar on the server side:

It works great for bidirectional and client streaming calls. It’s not completely the case for server calls. Why?
Because when you call a server streaming endpoint you only provide a StreamObserver for the response and the GRPC framework doesn’t return anything back.

Therefore it’s not possible to disable the automatic flow control nor to request more elements. Depending on your implementation it might not be too much of a problem as the request call is made after the onNext callback has been invoked. As long as you process completely the message in the onNext callback your service shouldn’t be overflowed. If not … well you have to dig deeper.

And if you look at the CallStreamObserver implementations you’ll see that they actually use a ClientCall or ServerCall and both of them provide a request method along with a Listener interface.

abstract class ClientCall[I, O] {
  abstract class Listener[T] {
    def onHeaders(headers: Metadata): Unit
    def onMessage(message: T): Unit
    def onClose(status: Status, trailers: Metadata): Unit
    def onReady(): Unit
  }
  def start(listener: Listener[O], headers: Metadata): Unit
  def request(count: Int): Unit
  def cancel(message: String, cause: Throwable): Unit
  def halfClose(): Unit
  def sendMessage(message: I)
  def isReady(): Boolean
  def setMessageCompression(enable: Boolean): Unit
}

It’s always possible to use these lower-levels implementations to provide back-pressure whatever the type of call you’re making – the drawback being having to re-implement the logic present in the CodeStreamObservers.

GRPC-Akkastream: Putting it altogether

Now that we’ve got a good idea on how back-pressure is implemented inside both GRPC-java and Akkastream, let’s see how we can put everything together and provide a fully reactive akkastream interface for GRPC.

We need 3 pieces:

  • A reactive Source
  • A reactive Flow
  • A reactive Sink

On the server side, the Source is going to provide incoming messages to the service Flow (implemented by the user). The output of the Flow is connected into the Sink that sends the responses to the client.

On the Source stage, every time an element is pulled by the downstream we need to request one more element from the GRPC observer using the request method.

class GrpcSourceStage[I, O](requestStream: CallStreamObserver[O])
extends GraphStageWithMaterializedValue[SourceShape[I], Future[StreamObserver[I]]] {
  val out = Outlet[I]("grpc.out")
  override val shape: SourceShape[I] = SourceShape.of(out)

  override def createLogicAndMaterializedValue(
    inheritedAttributes: Attributes
  ): (GraphStageLogic, Future[StreamObserver[I]]) = {
    val promise: Promise[StreamObserver[I]] = Promise()

    val logic = new GraphStageLogic(shape) with OutHandler {
      val inObs = new StreamObserver[I] {
        override def onError(t: Throwable) =
          getAsyncCallback((t: Throwable) => fail(out, t)).invoke(t)

        override def onCompleted() =
          getAsyncCallback((_: Unit) => complete(out)).invoke(())

        override def onNext(value: I) =
          getAsyncCallback((value: I) => push(out, value)).invoke(value)
      }

      override def onPull(): Unit = requestStream.request(1)

      override def preStart(): Unit = {
        requestStream.disableAutoInboundFlowControl()
        promise.success(inObs)
      }

      setHandler(out, this)
    }

    (logic, promise.future)
  }
}

On the Sink stage, every time we receive an element from upstream we check if the GRPC observer is ready. If it is we call its onNext method. If not we buffer the element and the onNext method will be called by the onReadyHandler when the observer becomes ready.

class GrpcSinkStage[I](observer: CallStreamObserver[I]) extends GraphStage[SinkShape[I]] {
  val in = Inlet[I]("grpc.in")
  override val shape: SinkShape[I] = SinkShape.of(in)
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) with InHandler with Runnable {
      var element: Option[I] = None

      override def run(): Unit =
        element match {
          case Some(value) if observer.isReady =>
            observer.onNext(value)
            tryPull(in)
          case _ => ()
        }
      override def onPush(): Unit = {
        val value = grab(in)
        if (observer.isReady) {
          observer.onNext(value)
          tryPull(in)
        } else element = Some(value)
      }
      override def onUpstreamFinish(): Unit =
        observer.onCompleted()
      override def onUpstreamFailure(t: Throwable): Unit =
        observer.onError(t)
      override def preStart(): Unit = pull(in)

      observer.setOnReadyHandler(this)
      setHandler(in, this)
    }
}

On the client side, the source of requests is provided by the user. We then need to connect it to a Flow that sends requests to the server and emits server responses to a Sink provided by the user.

The Flow stage is simply a combination of the Source and Sink above.

Overall the most confusing part is that you need to call request on the opposite observer. E.g calling request on the CallStreamObserver[I] actually asks GRPC to send more responses (of type O) to the StreamObserver[O].

Conclusion

All the implementation details are available inside this Github pull request. There are of course some possible optimisations like buffering more elements and requesting batch of elements at once …

For those interested in a Java reactive stream implementation there is an implementation supporting back-pressure provided by Salesforce called reactive-grpc.