Scala exercises: cats-effect

There’s a lot of info on cats-effect these days, but a lot of it is about concrete use-cases. Yet there’s not much for those who know the basics but don’t feel confident enough with the library to build full-fledged applications. I’ve seen a couple of questions, however, which can be well generalized and are more complex and interesting than examples provided in documentation. So, I decided to turn them into exercises.

If you know how to do FP in Scala, and a bit about cats and cats-effect, the initial solution shouldn’t take more than an hour for you to arrive at. If you struggle to find a solution, there will be a couple of hints. And for those who want to dive deeper, there are bonus assignments which require additional generalization and/or refactoring.

Currently, there are just two. I plan to slowly build up the list as I solve more interesting problems, mine or others’. So, all solutions:

  • Must work on both JVM and JS. Don’t use platform-specific APIs, in particular blocking APIs.
  • Must not use impure methods (e.g. no unsafeRun).
    • I recommend to use IOApp to run everything, but in a worksheet environment it’s fine to do unsafeRun at the very end.
    • Any call to stdlib impure method (e.g. Random.nextInt()) must be suspended in IO.
    • Don’t use vars, stdlib atomics, mutable collections and Futures, even suspended.
  • Must not use extra libraries besides cats and cats-effect.
    • Exception: you can use other effect types like SyncIO (where applicable), monix Coeval / Task with TaskApp or ZIO, either instead of cats’ IO or to test generalizations. However, all concurrency primitives (Ref, Deferred, MVar, etc.) should come from cats.effect package.

To jumpstart everything, a source sample to start with is provided for each exercise. You are free to change anything, there are no restrictions to code style as long as it is readable. I recommend sticking to provided type signatures / interface structure for those you will be implementing, unless bonuses require you to change these.

To prevent spoilers, post your answers as github repos or gists, and post them in comments or @ me on twitter (link in the footer). Gists are easy to leave comment on, too!

Worker pool with load balancing

Objective

Do parallel processing, distributed over a limited number of workers, each with its own state (counters, DB connections, etc.).

Requirements

  • Processing jobs must run in parallel
  • Submitting a processing request must wait if all workers are busy.
  • Submission should do load balancing: wait for the first worker to finish, not for a certain one.
  • Worker should become available whenever a job is completed successfully, with an exception or cancelled.

Assume the number of workers is not very large (<= 100).

Start (on Scastie)

import scala.util.Random
import scala.concurrent.duration._
import cats._
import cats.implicits._
import cats.effect.{IO, Timer}
import cats.effect.concurrent.Ref

// To start, our requests can be modelled as simple functions.
// You might want to replace this type with a class if you go for bonuses. Or not.
type Worker[A, B] = A => IO[B]

// Sample stateful worker that keeps count of requests it has accepted
def mkWorker(id: Int)(implicit timer: Timer[IO]): IO[Worker[Int, Int]] =
  Ref[IO].of(0).map { counter =>
    def simulateWork: IO[Unit] =
      IO(50 + Random.nextInt(450)).map(_.millis).flatMap(IO.sleep)

    def report: IO[Unit] =
      counter.get.flatMap(i => IO(println(s"Total processed by $id: $i")))

    x => simulateWork >>
      counter.update(_ + 1) >>
      report >>
      IO.pure(x + 1)
  }

trait WorkerPool[A, B] {
  def exec(a: A): IO[B]
}

object WorkerPool {
  // Implement this constructor, and, correspondingly, the interface above.
  // You are free to use named or anonymous classes
  def of[A, B](fs: List[Worker[A, B]]): IO[WorkerPool[A, B]] = ???
}

// Sample test pool to play with in IOApp
val testPool: IO[WorkerPool[Int, Int]] =
  List.range(0, 10)
    .traverse(mkWorker)
    .flatMap(WorkerPool.of)

Hints

Show hints
  • Relying on a concurrent queue might be a good idea. And MVar is essentially a one-element queue.
  • Because our workers are functions of type A => IO[B], we can freely do anything effectful before and after running function.
  • Our factory method (apply on companion) returns IO too. This lets us create a shared MVar and do pre-initialization, if needed.
Show heavy spoilers Put free workers into MVar. All workers should be free on init. Once they are done processing, guarantee that they put themselves back onto MVar. And we need to NOT wait on that put to complete, so use start and discard the resulting fiber.

Bonus

  • Generalize for any F using Concurrent typeclass.
  • Add methods to WorkerPool interface for adding workers on the fly and removing all workers. If all workers are removed, submitted jobs must wait until one is added.

Race for success

Objective

Quickly obtain data which can be requested from multiple sources of unknown latency (databases, caches, network services, etc.).

Requirements

  • The function should run requests in parallel.
  • The function should wait for the first request to complete successfuly.
  • Once a first request has completed, everything that is still in-flight must be cancelled.
  • If all requests have failed, all errors should be reported for better debugging.

Assume that there will be <= 32 providers and they all don’t block OS threads for I/O.

Start (on Scastie)

import scala.util.Random
import scala.concurrent.duration._
import cats._
import cats.data._
import cats.implicits._
import cats.effect.{IO, Timer, ExitCase}

case class Data(source: String, body: String)

def provider(name: String)(implicit timer: Timer[IO]): IO[Data] = {
  val proc = for {
    dur <- IO { Random.nextInt(500) }
    _   <- IO.sleep { (100 + dur).millis }
    _   <- IO { if (Random.nextBoolean()) throw new Exception(s"Error in $name") }
    txt <- IO { Random.alphanumeric.take(16).mkString }
  } yield Data(name, txt)
  
  proc.guaranteeCase {
    case ExitCase.Completed => IO { println(s"$name request finished") }
    case ExitCase.Canceled  => IO { println(s"$name request canceled") }
    case ExitCase.Error(ex) => IO { println(s"$name errored") }
  }
}

// Use this class for reporting all failures.
case class CompositeException (ex: NonEmptyList[Throwable]) extends Exception("All race candidates have failed")

// Implement this function:
def raceToSuccess[A](ios: NonEmptyList[IO[A]]): IO[A] = ???

// In your IOApp, you can use the following sample method list

val methods: NonEmptyList[IO[Data]] = NonEmptyList.of(
  "memcached",
  "redis",
  "postgres",
  "mongodb",
  "hdd",
  "aws"
).map(provider)

Hints

Show hints There are two operators we're interested in: race and racePair. Both run two tasks in parallel, the difference being what happens after one of them is completed. In case of race, the loser is automatically cancelled. In case of racePair, we get to choose what to do, where the still running process is represented by Fiber.
Show heavy spoilers Using racePair, try folding/reducing the list: race previous result with attempt, then, if we got a successful (as in, Right) result from one, cancel the other and return the result. Otherwise, fall back to the second one, all while accumulating the errors. The result should be something like Either[List[Throwable], A]. Then transform list into an exception and use .rethrow to lift it back to IO.

Bonus

  • If returned IO is cancelled, all in-flight requests should be properly cancelled as well.
  • Refactor function to allow generic effect type to be used, not only cats’ IO. (e.g. anything with Async or Concurrent instances).
  • Refactor function to allow generic container type to be used (e.g. anything with Traverse or NonEmptyTraverse instances).
    • Don’t use toList. If you have to work with lists anyway, might as well push the conversion responsibility to the caller.
    • If you want to support collections that might be empty (List, Vector, Option), the function must result in a failing IO/F when passed an empty value.
Written on December 3, 2018