Back from holidays let’s continue with some of my favourite topics: AkkaStreams and gRPC.
We’ve already seen how can take advantage of the ScalaPB code generation tool to generate new interfaces (GRPCMonix) on top of the grpc-java implementation or to create new tools to integrate gRPC with other services (GRPCGateway).
Similarly to GRPCMonix which provides a Monix interface – Task, Observable – on top of gRPC, it’s possible to develop an AkkaStream interface on top of gRPC.
Under the hood it’s still the grpc-java implementation that is running (using Netty). The gRPC AkkaStream library is mainly a bridge between gRPC StreamObserver
and AkkaStreams Flow
.
The interface
gRPC defines 4 different use cases:
- Unary calls: a classic request/response interaction
- Server streaming calls: a single client request triggers multiple response from the server
- Client streaming calls: a client sends multiple requests to the server which replies with a single response
- Bidirectional streaming calls: a client sends multiple requests to the server with replies with multiple responses. Client requests and server responses can be intertwined.
The good thing with AkkaStreams is that we can model all these use cases using Flow
. All the different call types can be modelled as a Flow[Request, Response, NotUsed]
– a flow of client requests to server responses.
Installation
gRPC Akka-streams is composed of 2 parts:
- a generator that generates client stub and the server trait to be implemented
- a runtime library (used by the generated code) to convert between akka-streams
Flow
and gRPCStreamObservers
In order to use the generator you need to add the scalaPB plugin and a dependency to the generator into your build. Add a proton.sbt file in your project folder
addSbtPlugin("com.thesamet" % "sbt-protoc" % "0.99.9") resolvers += Resolver.bintrayRepo("beyondthelines", "maven") libraryDependencies ++= Seq( "com.trueaccord.scalapb" %% "compilerplugin" % "0.6.0-pre5", "beyondthelines" %% "grpcakkastreamgenerator" % "0.0.1" )
Then in your build.sbt add the following
resolvers += Resolver.bintrayRepo("beyondthelines", "maven") PB.targets in Compile := Seq( scalapb.gen() -> (sourceManaged in Compile).value, grpc.akkastreams.generators.GrpcAkkaStreamGenerator -> (sourceManaged in Compile).value ) libraryDependencies ++= Seq( "com.trueaccord.scalapb" %% "scalapb-runtime" % com.trueaccord.scalapb.compiler.Version.scalapbVersion % "protobuf", // for gRPC "io.grpc" % "grpc-netty" % "1.4.0", "com.trueaccord.scalapb" %% "scalapb-runtime-grpc" % com.trueaccord.scalapb.compiler.Version.scalapbVersion, // for GRPC Akkastream "beyondthelines" %% "grpcakkastreamruntime" % "0.0.1" )
From now on, you’re ready to go just place a .proto file with some gRPC service definition in src/main/protobuf and it should automatically generates the corresponding akka-streams stub and traits.
Usage
On the server-side
On the server-side it means we just have to provide a Flow implementation that turns client requests into subscriber responses. It can be something as simple as:
// Unary case Flow[Request].map(computeResponse) // Server streaming Flow[Request].flatMapConcat(computeResponses) // Client streaming Flow[Request].fold(defaultResponse)(computeResponse) // Bidirectional streaming Flow[Request].flatMapConcat(computeResponses)
but of course you can use any of the Akkastreams operation available. We can easily add throttling by adding a throttling operation:
Flow[Request] .throttle(1, 10.millis, 1, ThrottleMode.Shaping) .map(computeResponse)
On the client side
On the client side we just have to use the Flow
provided by the stub.
Source .single(request) .via(stub.doSomething) .runForeach(println)
You can find more details with a complete example on github (This example is based on the route guide example from grpc-java).
That’s basically all you need to know to start enjoying working with akka-streams and gRPC.
Implementation
Now a few words on the implementation details. You don’t need to know anything about it to use it but I’ll think it might be useful information to share as it uses some more advanced akka-streams concepts (e.g. customs graph stage).
The gRPC Akka stream library provides a bridge between Akka-streams Flow
and the corresponding StreamObserver
used by the underlying implementation of grpc-java.
The client side
On the client side the user provides the input (source) and output (sink) and we need to implement a flow connecting the input to the output by invoking the gRPC service.
Unary Calls
Let’s start with the easiest part on the client side, the simple unary call: the classic request/response pattern.
This call is performed by
public static <ReqT, RespT> ListenableFuture<RespT> futureUnaryCall( ClientCall<ReqT, RespT> call, ReqT param )
This method calls sends the request ReqT
to the server and returns a ListenableFuture
with the response. Fortunately scalaPB provides us with a method that converts a ListenableFuture into a Scala Future.
Once we have a Scala Future we can easily turn it into a Source
and flatMapConcat
the initial flow with it to produce the response.
Flow[ReqT].flatMapConcat(request => Source.fromFuture( Grpc.guavaFuture2ScalaFuture( ClientCalls.futureUnaryCall(channel.newCall(METHOD_NAME, options), request) ) ) )
Server streaming
The server streaming call is slightly more complicated as we have to deal with multiple response from the server. grpc-java provides a StreamObserver
to deal with it. We can take advantage of the akka-streams integration with the reactive-streams API to solve this problem. (A reactive Subscriber
can easily be turned into a StreamObserver
).
Flow[ReqT].flatMapConcat(request => Source.fromPublisher( new Publisher[RespT] { override def subscribe(subscriber: Subscriber[_ >: RespT]): Unit = ClientCalls.asyncServerStreamingCall( channel.newCall(METHOD_NAME, options), request, reactiveSubscriberToGrpcObserver[RespT](subscriber) ) } ) )
We create a Source
from a reactive Publisher
which when subscribed triggers the gRPC call by transforming the subscriber into a StreamObserver.
Bidi and client streaming
These 2 cases can be solve the same way (they’re both implemented using StreamObserver to provide requests and consume responses) – the only difference is that the client streaming calls produce only one response from the server.
This use case is solved with a custom GraphStage
.
class GrpcGraphStage[I, O](operator: GrpcOperator[I, O]) extends GraphStage[FlowShape[I, O]] { val in = Inlet[I]("grpc.in") val out = Outlet[O]("grpc.out") override val shape: FlowShape[I, O] = FlowShape.of(in, out) override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new GraphStageLogic(shape) { val outObs = new StreamObserver[O] { override def onError(t: Throwable) = fail(out, t) override def onCompleted() = getAsyncCallback((_: Unit) => complete(out)).invoke(()) override def onNext(value: O) = getAsyncCallback((value: O) => emit(out, value)).invoke(value) } val inObs = operator(outObs) setHandler(in, new InHandler { override def onPush(): Unit = { val input = grab(in) inObs.onNext(input) pull(in) } override def onUpstreamFinish(): Unit = inObs.onCompleted() override def onUpstreamFailure(t: Throwable): Unit = inObs.onError(t) }) setHandler(out, new OutHandler { override def onPull(): Unit = () }) override def preStart(): Unit = pull(in) } }
Here we define a GraphStage with a classic FlowShape: one input and one output. We then need to consume events from the input port and feed them into the input StreamObserver so that they are sent to the server.
The second part is to consume events from the output StreamObserver and feed them to the GraphStage output port.
The problem is that it’s fully asynchronous and we have no control of when the server will provide a response. That’s why we need to use getAsyncCallback
to handle the responses.
Moreover a Flow should provide a back-pressure mechanism: produce its output only when asked by the downstream component. As we have no control of when the server produces a response, the only way to deal with back-pressure is to buffer the responses so that they are consumed downstream only when needed.
As you’ve probably noticed there is no buffer in this implementation. The buffering is in fact hidden by the call to emit
. emit
stacks new output handlers on top of each other and removed them once their elements are consumed. This is how the buffering is performed here.
Using this custom GraphStage
the implementation is quite strait-forward
Flow.fromGraph(new GrpcGraphStage[ReqT, RespT](outputObserver => ClientCalls.asyncClientStreamingCall( channel.newCall(METHOD_NAME, options), outputObserver ) ))
Given an output StreamObserver we create an input StreamObserver by performing a server call. (both asyncClientStreamingCall
and asyncBidiStreamingCall
returns an StreamObserver
for the input). The output StreamObserver
is created in out custom GraphStage
.
The server side
The server side wiring is the opposite of the client side. Here the user provides the flow (the service implementation) and we need to wire the input observer to the input port of the flow and the output port of the flow to the output observer.
Unary and server streaming calls
This is the easy part. We feed a Source
of a single request into the server Flow
and connect it to a Sink
which feed every output event to a StreamObserver
Source .single(request) .via(serviceImpl.grpcMethod) .runForeach(responseObserver.onNext) .onComplete { case Success(_) => responseObserver.onCompleted() case Failure(t) => responseObserver.onError(t) }(mat.executionContext)
Bidi and client streaming calls
This is definitely the tricky part: we need to turn a StreamObserver
into a Source
.
I initially thought of leveraging the reactive streams API to do it:
reactiveSubscriberToGrpcObserver( serviceImpl .grpcMethod .to(Sink.fromSubscriber(grpcObserverToReactiveSubscriber(responseObserver))) .runWith(Source.asSubscriber[ReqT]) )
It looks simple and compiles like a charm. However it fails at runtime
java.lang.IllegalStateException: spec violation: onNext was signaled from upstream without demand
and again it’s because we have no control of when the events occur which is at odds with the back-pressure provided by akka-streams.
To solve this issue we used the same GraphStage strategy that we use on the client side but this time with a source shape (only one output port).
class GrpcSourceStage[O] extends GraphStageWithMaterializedValue[SourceShape[O], Future[StreamObserver[O]]] { val out = Outlet[O]("grpc.out") override val shape: SourceShape[O] = SourceShape.of(out) override def createLogicAndMaterializedValue( inheritedAttributes: Attributes ): (GraphStageLogic, Future[StreamObserver[O]]) = { val promise: Promise[StreamObserver[O]] = Promise() val logic = new GraphStageLogic(shape) { val observer = new StreamObserver[O] { override def onError(t: Throwable) = fail(out, t) override def onCompleted() = getAsyncCallback((_: Unit) => complete(out)).invoke(()) override def onNext(value: O) = getAsyncCallback((value: O) => emit(out, value)).invoke(value) } setHandler(out, new OutHandler { override def onPull(): Unit = () }) override def preStart(): Unit = promise.success(observer) } (logic, promise.future) } }
It’s very similar to the GraphStage we used on the client side (emit and async callbacks).
That’s great but we need to return a StreamObserver linked to this source so that the gRPC framework can provide us with the input events. Well, the only way to do it is to use the materialised value of the source to return the StreamObserver (that’s why it extends GraphStageWithMaterializedValue
).
Notice that the materialised value is a Future[StreamObserver[In]]
. We use a Future here because we don’t want incoming events to flow before the Source is fully initialised. As a consequence we have a blocking call to wait for the source initialisation in the generated code:
Await.result( Source .fromGraph(new GrpcSourceStage[io.grpc.routeguide.RouteNote]) .via(serviceImpl.grpcMethod) .to(Sink.fromSubscriber(grpcObserverToReactiveSubscriber(responseObserver))) .run(), 5.seconds )
Conclusion
Using the reactive streams to make gRPC calls feels really good. However gRPC StreamObserver
doesn’t play nice with reactive streams because of the lack of back-pressure support.
It’s probably worth investigating how to replace the Netty implementation with Akka-HTTP and look for a nicer integration.
As always you can find the complete source code on github, so don’t hesitate to have a look and contribute!