Categories
Programming

Streaming patterns with fs2

After an overview of cats-effect which deals with single effect it feels natural to have a look at fs2 and see how multiple effects can be combined into a stream.

As we’ve covered some common streaming patterns with Akka stream a while ago it’ll be interesting to see how they compare with each other.

Overview

Fist thing first let’s create a Stream using fs2. Unlike Akka Streams that always require an actor system and a materializer, an fs2 stream can be created straight away:

val stream = Stream(1, 2, 3, 4)

An interesting thing to note is the type of the stream:

fs2.Stream[fs2.Pure, Int]

So an fs2 stream has 2 type parameters. The second one is the one you expect as it represents the type of the elements of the stream – e.g. Int.

The first type is a type constructor which corresponds to the effect type. HerePure means that the stream doesn’t require any effect to be evaluated.

If you were to integrate with cats-effect (let’s say to use cats IO – but fs2 can use any effect) you would create a stream of type

fs2.Stream[IO, Int]

Of course you can no longer use fs2.Stream.apply to build the stream as the IO effect needs to be evaluated to product an element in the stream. This is simply done with the eval method

fs2.Stream.eval(IO(2))

Like cats-effect, creating a Stream doesn’t run the stream. To “run” the stream we need to “compile” it and then use one of the cats effect run method:

fs2.Stream.eval(IO(2))
  .compile
  .toList
  .unsafeRunSync

Now that we know how to create a stream let’s jump straight into some usage patterns and see how it compares to Akka streams.

Patterns

Flattening a stream

Unlike Akka stream, f2 doesn’t provide a mapConcat method but it’s still pretty easy thing to do. We use emits to create a stream from a collection which we can then just flatMap:

fs2.Stream.emits('A' to 'E')
  .map(letter => (1 to 3).map(index => s"$letter$index"))
  .flatMap(fs2.Stream.emits) // this flattens the stream
  .compile
  .toList

// or

fs2.Stream.emits('A' to 'E')
  .map(letter => fs2.Stream.emits(1 to 3).map(index => s"$letter$index"))
  .flatten
  .compile
  .toList

]

This generates the element in sequence and gives a List(A1, A2, A3, B1, B2, B3, C1, ...)

It processes all the elements of the first stream before moving to the second stream. Now let’s imagine that each stream is infinite (e.g. A1, A2, A3, A4, A5, ...). In this case the B elements (B1, B2, B3, B4, ...) are never reached.

If we want to process the streams in parallel we can use parJoin

fs2.Stream.emits[IO, Char]('A' to 'E')
  .map(letter => fs2.Stream.emits[IO, Int](1 to 3).map(index => s"$letter$index"))
  .parJoin(5)

Now the streams are processed in parallel in a non-deterministic way. One possible out-come is D1, D2, D3, A1, A2, ...

Note that we’re using IO here because parJoin requires a concurrent effect.

Alternatively if you want to consume the stream in a breadth-first like fashion you have to do a little more work yourself (I’m not aware of anything usable out-of-the-box)

def breadthFirst[F, E](streams: Stream[F, Stream[F, E]]): Stream[F, Stream[F, E]] =
  Stream.unfoldEval(streams) { streams =>
    val values = streams.flatMap(_.head) // get the head of each stream
    val next = streams.map(_.tail) // continue with the tails
    values.compile.toList.map(_.headOption.map(_ => values -> next))  // stop when there's no more values
  }

Batching

As with Akka Streams batching is straight forward with baked in methods:

Stream.emits(1 to 100).chunkN(10).map(println).compile.drain

A Chunk is a finite sequence of values that is used by fs streams internally:

val s = Stream(1, 2) ++ Stream(3) ++ Stream(4, 5, 6)
val chunks = s.chunks.toList // List(Chunk(1, 2), Chunk(3), Chunk(4, 5, 6))

And if you want to batch within a specific time window groupedWithin is what you need:

Stream.awakeEvery[IO](10.millis)
  .groupWithin(100, 100.millis)
  .evalTap(chunk => IO(println(s"Processing batch of ${chunk.size} elements")))
  .compile
  .drain.unsafeRunTimed(1.second)

Asynchronous computation

Here fs2 has clearly the advantage as asynchronicity depends directly on the effect type F which must have an Async[F] in scope.

It offers all the eval methods: evalMap, evalTap, evalScan, evalMapAccumulate.

If you want to run asynchronous effect in parallel the effect type must have an instance of Concurrent[F] in scope. If it’s the case the parEval methods are available: parEvalMap and parEvalMapAccumulate.

Let’s keep the same example where a program write asynchronously to a database with

def writeToDatabase[F[_]: Async](chunk: Chunk[Int]): F[Unit] =
  Async[F].async { callback =>
    println(s"Writing batch of $chunk to database by ${Thread.currentThread().getName}")
    callback(Right(()))
  }

we can then write batches in parallel to the database with

fs2.Stream.emits(1 to 10000)
  .chunkN(10)
  .covary[IO]
  .parEvalMap(10)(writeToDatabase[IO])
  .compile
  .drain
  .unsafeRunSync()

Note that parEvalMap preserves the stream ordering. If this is not required there is a parEvalMapUnordered method.

If you’d like some consistency with the Akka Streams API you’d be glad to know that there is a mapAsync (and mapAsyncUnordered) methods that are just aliases for parEvalMap (and parEvalMapUnordered respectively).

Concurrency

In fs2 the async boundaries can be controlled by directly by the effect computations. Let’s consider a similar example as with Akka Streams where a streams runs through a series of stages (or pipes)

def pipe[F[_] : Sync](name: String): Stream[F, Int] => Stream[F, Int] =
  _.evalTap { index =>
    Sync[F].delay(
      println(s"Stage $name processing $index by ${Thread.currentThread().getName}")
      )
  }

Stream.emits(1 to 10000)
  .covary[IO]
  .through(pipe("A"))
  .through(pipe("B"))
  .through(pipe("C"))
  .compile
  .drain
  .unsafeRunSync()

As expected this program uses a single thread and each element is process in sequentially through the pipes.

Now if we change our pipe definition to

def pipe[F[_] : Sync: LiftIO](name: String): Stream[F, Int] => Stream[F, Int] =
  _.evalTap { index =>
    (IO.shift *> IO(println(s"Stage $name processing $index by ${Thread.currentThread().getName}"))).runAsync(_ => IO.unit).to[F]
      )
  }

IO.shift places an async boundary giving a chance to use another thread for execution. The runAsync method runs the computation without waiting for its result.

If applied to the same stream as before the elements are still processed sequentially (1 processing starts before 2 which starts before 3 ….) and the stages A, B and C are also started in order (A starts, then B, then C).

However we no longer wait for each stage to finish and as we use different threads the execution becomes non-deterministic.

Throttling

Fs2 provides a mechanism to create a stream that emits an element on a fix interval. If zipped to another stream it limits the rate of the second stream:

Stream.awakeEvery[IO](1.second) zipRight Stream.emits(1 to 100)

As it’s a very common pattern fs2 provide us with a method metered that do just that.

If instead of limiting the rate of the stream you prefer to discard some elements you can use debounce

val ints = Stream.constant[IO, Int](1).scan1(_ + _) // 1, 2, 3, ...
ints.debounce(1.second)

This emits an element at a fixed rate discarding every element produced in between.

Alternatively if you want to accept the first X elements emitted during a time interval and discard any other element until the end of the interval this is a little more involved as there is nothing that comes out of the box.

E.g. if we want 100 elements per second, we want to keep the first 100 elements then discard any other elements until another second starts.

val ints = Stream.constant[IO, Int](1).scan1(_ + _) // 1, 2, 3, ...
val ticks = fs2.Stream.every[IO](1.second) // emits true every second
val rate = 100 // 100 elements per second
val throttledInts = ints.zip(ticks).scan((0,  rate + 1)) {
  case (_, (n, true)) => (n, 0) // new second start, emit element and reset counter
  case ((_, count), (n, _)) => (n, count+1) // emit elements and increment counter
}
.filter(_._2 < rate) // keep only the elements where counter is less than rate
.map(_._1) // remove counter

Idle timeout

Akka stream has an idleTimeout methods that fails a stream if no element are emitted within a given timeout.

Fs2 doesn’t provide something similar but this is trivial to implement

def idleTimeout[F[_], A](
  s: fs2.Stream[F, A],
  timeout: FiniteDuration
)(
  implicit F: Concurrent[F],
  timer: Timer[F]
): fs2.Stream[F, A] =
  s.groupWithin(1, timeout).evalMap(
    _.head.fold[F[A]](F.raiseError(new Exception("timeout")))(F.pure)
  )

The idea is to use groupWithin then check the Chunk. If it contains an element that’s good we emit the element otherwise the chunk is empty so we raise the error.

Error Handling

Any exception raised during the stream processing (or explicitly calling Stream.raiseError) ends the stream in error. The error handling is similar to the cats way of doing this

Stream.raiseError(new Exception("Oops"))
  .handleErrorWith { error =>
     Stream(error.getMessage)
  }

Fs2 provides a flexible way to deal with retries. Be it retrying a fixed number of times, with or without backoff, … You can retry a single effect (or a whole stream if it is compiled into a single effect)

val ints = (
  Stream.emits(1 to 10) ++ Stream.raiseError(new Exception("the end"))
).covary[IO].evalTap(n => IO(println(n))).compile.drain

Stream.retry(
  ints,
  delay = 1.second, // delay before first retry
  nextDelay = _ * 2, // doubles the delay for every retry
  maxAttempts = 5,
  _ => true // retry on any error
).compile.drain.unsafeRunSync()

Handling ressources

As clearly mentioned in the fs2 documentation error handling is not meant for freeing up resources. For that matter fs2 provides a much safer way of doing things via the concept of bracket.

That probably sounds familiar as it’s the same concept as provided by cats-effect.

Using bracket makes sure that the resources are always freed whatever the outcome of the computation:

val acquire = IO(println("Aquiring resource")) *> IO(new Random())
val release = (_: Random) => IO(println("Releasing resource"))
Stream.bracket(acquire)(release).flatMap { rand =>
  fs2.Stream.emits(1 to 10).map(_ => rand.nextInt())
}.evalTap(n => IO(println(n))).compile.drain.unsafeRunSync()

There are other bracket methods like resource (which takes a resource directly) and bracketCase (which let’s you know how the computation ended in the release phase)

Conclusion

I found Fs2 API both simple and complete enough to write powerful applications. It might not cover all cases but the building blocks are powerful enough to let you write code to suit your own needs.

Embedding the effect inside the stream is powerful concept as it allows to express all your computation as a stream (including the side-effect) and then just use compile.drain to run it.

Fs2 has been a pleasure to work with so far and it has my preferences over Akka Streams (it doesn’t require an ActorSystem nor relies on Scala Futures). It is flexible enough to work with any effect and the building blocks are easy to combine. Plus a thorough documentation, what’s left to ask?