1

I have a situation where I need to run a bunch of operations in parallel.

All operations have the same return value (say a Seq[String]).

Its possible that some of the operations may fail, and others successfully return results.

I want to return both the successful results, and any exceptions that happened, so I can log them for debugging.

Is there a built-in way, or easy way through any library (cats/scalaz) to do this, before I go and write my own class for doing this?

I was thinking of doing each operation in its own future, then checking each future, and returning a tuple of Seq[String] -> Seq[Throwable] where left value is the successful results (flattened / combined) and right is a list of any exceptions that occurred.

Is there a better way?

Ali
  • 261,656
  • 265
  • 575
  • 769
  • To the OP: the solution you accepted is incorrect, it will not wait for the result of the computation. If your future is not completed when this code runs, you will **never** see the result. You either have to wait until it completes or register an asynchronous callback like I suggested. – Dici Sep 22 '18 at 10:32
  • Already using Future.await before checking result. Thanks. – Ali Sep 22 '18 at 10:36
  • I can't see this method in the documentation (https://www.scala-lang.org/api/2.12.3/scala/concurrent/Future.html). Is that an implicit conversion? I've actually never used Scala futures and it seems like there's no equivalent of Java's `Future.get` from the doc – Dici Sep 22 '18 at 10:42
  • 1
    Meant Await.ready, sorry – Ali Sep 22 '18 at 10:45

4 Answers4

2

Using Await.ready, which you mention in a comment, generally loses most benefits from using futures. Instead you can do this just using the normal Future combinators. And let's do the more generic version, which works for any return type; flattening the Seq[String]s can be added easily.

def successesAndFailures[T](futures: Seq[Future[T]]): Future[(Seq[T], Seq[Throwable])] = {
  // first, promote all futures to Either without failures
  val eitherFutures: Seq[Future[Either[Throwable, T]]] = 
    futures.map(_.transform(x => Success(x.toEither)))
  // then sequence to flip Future and Seq
  val futureEithers: Future[Seq[Either[Throwable, T]]] = 
    Future.sequence(eitherFutures)
  // finally, Seq of Eithers can be separated into Seqs of Lefts and Rights
  futureEithers.map { seqOfEithers =>
    val (lefts, rights) = seqOfEithers.partition(_.isLeft)
    val failures = lefts.map(_.left.get)
    val successes = rights.map(_.right.get)
    (successes, failures)
  }
}

Scalaz and Cats have separate to simplify the last step.

The types can be inferred by the compiler, they are shown just to help you see the logic.

Alexey Romanov
  • 167,066
  • 35
  • 309
  • 487
1

Sounds like a good use-case for the Try idiom (it's basically similar to the Either monad).

Example of usage from the doc:

import scala.util.{Success, Failure}

val f: Future[List[String]] = Future {
  session.getRecentPosts
}

f onComplete {
  case Success(posts) => for (post <- posts) println(post)
  case Failure(t) => println("An error has occurred: " + t.getMessage)
}

It actually does a little bit more than what you asked because it is fully asynchronous. Does it fit your use-case?

Dici
  • 25,226
  • 7
  • 41
  • 82
1

Calling value on your Future returns an Option[Try[T]]. If the Future has not completed then the Option is None. If it has completed then it's easy to unwrap and process.

if (myFutr.isCompleted)
  myFutr.value.map(_.fold( err: Throwable  => //log the error
                         , ss: Seq[String] => //process results
                         ))
else
 // do something else, come back later
jwvh
  • 50,871
  • 7
  • 38
  • 64
  • It seems like you're often going to miss the result with this code if you `else` isn't also blocking on the future or registering a callback – Dici Sep 22 '18 at 10:27
  • Using callbacks is another approach. The difficulty there is that a callback returns `Unit` and it's difficult to bring the result data into the main thread for further processing. – jwvh Sep 22 '18 at 10:33
  • @jvwh then you should at least loop until completion. And obviously it shouldn't be active waiting, otherwise it will have terrible peformance. Your solution doesn't describe how to get the result back when the future doesn't immediately returns. – Dici Sep 22 '18 at 10:34
  • I offered a small code excerpt. I assume (perhaps naively) the OP can assess its applicability within his context. – jwvh Sep 22 '18 at 10:37
0

I'd do it this way:

import scala.concurrent.{Future, ExecutionContext}
import scala.util.Success

def eitherify[A](f: Future[A])(implicit ec: ExecutionContext): Future[Either[Throwable, A]] = f.transform(tryResult => Success(tryResult.toEither))

def eitherifyF[A, B](f: A => Future[B])(implicit ec: ExecutionContext): A => Future[Either[Throwable, B]] = { a => eitherify(f(a)) }

// here we need some "cats" magic for `traverse` and `separate`
// instead of `traverse` you can use standard `Future.sequence`
// there is no analogue for `separate` in the standard library

import cats.implicits._

def myProgram[A, B](values: List[A], asyncF: A => Future[B])(implicit ec: ExecutionContext): Future[(List[Throwable], List[B])] = {
  val appliedTransformations: Future[List[Either[Throwable, B]]] = values.traverse(eitherifyF(asyncF))
  appliedTransformations.map(_.separate)
}
Mikhail Golubtsov
  • 6,285
  • 3
  • 29
  • 36