2

I want to turn the following sequential code into concurrent code with Futures and need advice on how to structure it.

sequential:

import java.net.URL

val providers = List(
  new URL("http://www.cnn.com"),
  new URL("http://www.bbc.co.uk"),
  new URL("http://www.othersite.com")
)

def download(urls: URL*) = urls.flatMap(url => io.Source.fromURL(url).getLines).distinct

val res = download(providers:_*)

I want to download all sources that are coming in via the varargs of the download method and combine the results into one Seq/List/Set, whatever, together. When one Future failed, let's say because the server is unreachable, it should take all others and move on and return the result nonetheless. firstCompletedOf won't work because I need the results of all, except one failed due to error. I thought about using Future.sequence like below but I can't get it to work. Here is what I tried...

def download(urls: URL*) = Future.sequence {
  urls.map { url =>
    Future {
      io.Source.fromURL(url).getLines
    }
  }
} 

This produces a Seq[Future[Iterator[String]]] which is not compatible with M_[Future[A_]].

A Future[Iterator[String]] is what I want. (I thought I return an Iterator because I need to reuse it later on with reset method on Iterator.)

user3350744
  • 449
  • 1
  • 5
  • 12

1 Answers1

1

You can use parallel collections:

import java.net.URL

val providers = List(
  new URL("http://www.cnn.com"),
  new URL("http://www.bbc.co.uk"),
  new URL("http://www.othersite.com")
)

def download(urls: URL*) = urls.par.flatMap(url => {
  Try {
    io.Source.fromURL(url).getLines
  } match {
    case Success(e) => e
    case Failure(_) => Seq()
  }
}).toSeq

val res: Seq[String] = download(providers:_*)

Or if you want the non blocking version with a Future:

def download(urls: URL*) = Future {
  blocking {
    urls.par.flatMap(url => {
      Try {
        io.Source.fromURL(url).getLines
      } match {
        case Success(e) => e
        case Failure(_) => Seq()
      }
    })
  }
}

val res: Future[Seq[String]] = download(providers:_*)
Yuval Itzchakov
  • 146,575
  • 32
  • 257
  • 321
  • the Future-example is not non-blocking, it is merely async. – Viktor Klang Aug 12 '16 at 05:42
  • @ViktorKlang The download method is non blocking. The fetching process via `io.Source.fromURL`. is synchronous. Theres nothing "async" here at all. Sending an HTTP request is a naturally asynchronous operation, theere isn't a need for a thread here at all, but that's what the OP is using. – Yuval Itzchakov Aug 12 '16 at 06:18
  • `download` is asynchronous in the sense that it will return (a Future) before the work has been performed. Since Future.apply's work entails calling a blocking method (fromURL(…).getLines) its means that if the ExecutionContext runs synchronously then the method blocks the caller, otherwise it will block the thread which executes the logic. Therefor it is not non-blocking (some thread will be blocked) and it is asynchronous, likely concurrent and uses a parallelization construct. (Disclaimer: I am the main author of Scala Futures in the Scala Standard Library) – Viktor Klang Aug 12 '16 at 08:08
  • @ViktorKlang I know who your are :). `download` is "fake-asynchrony". It is using a blocking method `fromURL` to perform work that is actually *asynchronous by nature* as sending a request over the wire. It is non-blocking to the consumer, but internally it is consuming an ExecutionContext thread to *block* on work that it really *shouldn't*. We've had this discussion on twitter before :), and I said Java/Scala is missing a construct that tells the caller "This is actually an IO completion". – Yuval Itzchakov Aug 12 '16 at 08:32
  • No, it isn't "fake-asynchrony": either it executes synchronously (sync EC, highly discouraged) or it is asynchronous (returns before work is completed). If the former then it blocks the caller thread and if the latter then it blocks an EC thread. No matter what, it will block *something* :) – Viktor Klang Aug 12 '16 at 08:37
  • @ViktorKlang I think you and I have different terminology to the meaning of asynchronous. A non blocking method doesn't make it "asynchronous", it makes it *parallel*. When I execute an asynchronous operation, I don't want any thread to block at all, as there is no actual need. If my device driver (i.e network card) is capable of callback based completion, why do I need to block? – Yuval Itzchakov Aug 12 '16 at 08:39
  • As for Non-blocking vs asynchronous I think this covers it pretty well: http://stackoverflow.com/a/2625565/227803 I don't understand how you draw the line from *asynchronous* to *parallel*, *parallel* is a function of execution (having multiple physical execution units), *paralellization* is the means of enabling multiple execution units to work on a task, *concurrency* is the logical separation of units of execution (could be multiplexed over 1…N physical execution units). – Viktor Klang Aug 12 '16 at 08:51
  • Let us [continue this discussion in chat](http://chat.stackoverflow.com/rooms/120776/discussion-between-yuval-itzchakov-and-viktor-klang). – Yuval Itzchakov Aug 12 '16 at 09:07