10

Here is some Scala cats code using the IO Monad:

import java.util.concurrent.{ExecutorService, Executors}

import cats.effect.IO

import scala.concurrent.{ExecutionContext, ExecutionContextExecutor}
import scala.util.control.NonFatal

object Program extends App {

  type CallbackType = (Either[Throwable, Unit]) => Unit

  // IO.async[Unit] is like a Future that returns Unit on completion.
  // Unlike a regular Future, it doesn't start to run until unsafeRunSync is called.
  def forkAsync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.async[Unit] { callback: CallbackType =>
    // "callback" is a function that either takes a throwable (Left) or whatever toRun returns (Right).
    println("LalalaAsync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        val nothing: Unit = toRun() // Note: This line executes the body and returns nothing, which is of type Unit.
        try {
          callback(Right(nothing)) // On success, the callback returns nothing
        } catch {
          case NonFatal(t) => callback(Left(t)) // On failure, it returns an exception
        }
      }
    })
  }

  def forkSync(toRun: () => Unit)(executor: ExecutorService): IO[Unit] = IO.apply {
    println("LalalaSync: " + Thread.currentThread().getName)
    executor.execute(new Runnable {
      def run(): Unit = {
        toRun()
      }
    })
  }

  val treadPool: ExecutorService = Executors.newSingleThreadExecutor()
  val mainThread: Thread = Thread.currentThread()

  val Global: ExecutionContextExecutor = ExecutionContext.global

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
   */
  val program = for {
    _ <- IO {
      println("1 Hello World printed synchronously from Main." + Thread.currentThread().getName) // "main" thread
    }
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- forkSync { () =>
      println("Hello World printed synchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- forkAsync { () =>
      println("Hello World printed asynchronously from thread pool." + Thread.currentThread().getName) // "pool-1-thread-1" thread
    }(treadPool)
    _ <- IO.shift(Global) // Shift to Global Execution Context
    _ <- IO {
      println("2 Hello World printed synchronously from Global ." + Thread.currentThread().getName) // "scala-execution-context-global-13" thread
    }
  } yield ()

  program.unsafeRunSync()
}

To run it you would need to add:

libraryDependencies ++= Seq(
  "org.typelevel" %% "cats" % "0.9.0",
  "org.typelevel" %% "cats-effect" % "0.3"
),

To your build.sbt file.

Note the output:

  /*
  Output:
    1 Hello World printed synchronously from Main.main
    LalalaSync: scala-execution-context-global-12
    Hello World printed synchronously from thread pool.pool-1-thread-1
    LalalaAsync: scala-execution-context-global-12
    Hello World printed asynchronously from thread pool.pool-1-thread-1
    2 Hello World printed synchronously from Global .scala-execution-context-global-12
 */

Basically, I don't understand how IO.shift(Global) or how IO.async works.

For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1". Also, what is the difference between forkAsync and forkSync in this example? Both of them start in the ExecutionContext.global and then execute a Runnable in "pool.pool-1-thread-1".

Like are forkAsync and forkSync doing the exact same thing or is forkAsync doing something different? If they are doing the same thing, what is the point of wrapping code in IO.async? If they are not doing the same thing, how are they different?

Michael Lafayette
  • 2,972
  • 3
  • 20
  • 54

1 Answers1

12

For example, why is it that after I call "forkAsync", if I don't call "IO.shift(Global)", the subsequent synchronous IO objects are run in "pool-1-thread-1".

The more important question is why would you expect it to evaluate the "subsequent synchronous IO objects" on global?

IO doesn't have internally the notion of thread-pools, it doesn't know about global, so it cannot shift back to your default thread-pool, therefore you need to trigger a manual shift indeed.

Upgrade to the latest version 1.0.0 and you also have evalOn in ContextShift which will execute an IO action on a specified thread-pool and then shift back to your "global", which I suppose is what you want.

Also, what is the difference between forkAsync and forkSync in this example?

Your forkSync triggers the execution of the Runnable, but does not wait for its completion. It's a fire and forget. Which means that subsequent chained actions will not do back-pressuring.

Some advice:

  1. upgrade to the latest version (1.0.0)
  2. read the docs at: https://typelevel.org/cats-effect/datatypes/io.html
Alexandru Nedelcu
  • 8,061
  • 2
  • 34
  • 39
  • So does this imply that my `forkAsync` is not fire and forget? Like does my `forkAsync` wait for the `run` method to complete? – Michael Lafayette Sep 21 '18 at 07:09
  • Ah, I see. `forkAsync` waits for run to complete but `forkSync` does not. Also, if you do IO.shift after `forkSync` it changes the thread that `forkSync` initially runs on, but `forkSync` continues to be fire and forget. – Michael Lafayette Sep 21 '18 at 07:18
  • And instead of `forkSync` I can just do `_ <- contextShift.evalOn(ec)(IO{Thread.sleep(1000); println("Hello World")})` and it will siphon this long-running IO onto treadPool and return to the default ExecutionContext. – Michael Lafayette Sep 21 '18 at 07:23
  • Wait a second. `contextShift.evalOn(ec)(IO{Thread.sleep(1000); println("Hello World")})` waits for my synchronous IO to complete before executing the next IO in the sequence. It is no longer fire and forget. Now it's basically the same thing as doing `forkAsync`. – Michael Lafayette Sep 21 '18 at 07:30
  • Okay, so if I can get the same "wait for completion" semantics from `contextShift.evalOn(IO.apply{...})`, and I can just shift to the thread that my IO.async block would use, then why would I ever use IO.async? Like can't I just replace calls to IO.async with blocking calls to IO.apply? – Michael Lafayette Sep 21 '18 at 07:38