1

I'm writing a program that scrapes links from oxfordlearnersdictionaries.com using Rust. I'm using hyper and futures.

I have a collection of links to each section and use stream::unfold to construct access to each page:

// Returns the links scraped, and probably Uri to the next page.
fn process_body_and_return_next(body: Body) -> (Vec<String>, Option<Uri>) { ... }

// In main()
let mut core = Core::new().unwrap();
let handle = core.handle();
let client = Client::new(&handle);
let uris = ...

let jobs = uris.map(|uri| {
    stream::unfold(Some(uri), |uri| {
        uri.map(|uri| {
            client
                .get(uri)
                .and_then(|res| res.body().concat2())
                .map(process_body_and_return_next)
        })
    })
});

Now I've got a impl Iterator<Item = impl Stream<Item = Vec<String>>>. How can I merge it to a single Stream of Vecs, like using stream::futures_unordered to merge Futures?


Edit: I tried combine stream::iter_ok and stream::Stream::flatten:

let flattened = ::futures::stream::iter_ok(jobs)
    .flatten();

But that's not efficient since I want to send several web request asynchronously. The combined Stream should produce value whenever a inner Stream is ready.

Peng Guanwen
  • 547
  • 3
  • 9

2 Answers2

4

An iterator may be turned into a stream using futures::stream::iter_ok, this allows your iterator of streams to be turned into a stream of streams:

::futures::stream::iter_ok(jobs)

You can then flatten this stream of streams into a single stream of all items using Stream::flatten():

let flattened = ::futures::stream::iter_ok(jobs)
    .flatten();
Lukazoid
  • 19,016
  • 3
  • 62
  • 85
  • But in that way the items will be evaluated one by one, rather than simultaneously. Is there a `Stream` version of `futures_unordered`? – Peng Guanwen Feb 14 '18 at 14:26
  • 2
    @PengGuanwen your requirement is not clear. You wanted a `Stream` of items that are produced by the inner streams. How could anything be delivered "simultaneously"? – Stefan Feb 14 '18 at 15:27
  • 1
    I need an unordered buffered `Stream`. When you combine an `Iterator` of `Future`s, you can use either `stream::futures_ordered` or `stream::futures_unordered`, and if you use the latter, the `Stream` will produce value whenever one of the `Future`s is available. When you have an `Stream` of `Future`s, you can use `stream::Stream::buffer_unordered` to achieve this. But in my case, a `Iterator` of `Stream`s is hard to combine that way. I can't figure out the reason. – Peng Guanwen Feb 15 '18 at 02:17
0

The select combinator takes two Streams and yields when one of the two streams is ready.

In order to select from more than two streams, you can chain calls to select. However, since you don't know in advance the number of streams you have to select on, you'll have to box the intermediate streams in order to erase the specific Stream type, so that the program type-checks.

extern crate futures;

use futures::Stream;

fn select_all<'a, I, T, E>(seq: I) -> Box<Stream<Item = T, Error = E> + 'a>
where
    I: IntoIterator,
    I::Item: Stream<Item = T, Error = E> + 'a,
    T: 'a,
    E: 'a,
{
    let mut iter = seq.into_iter();
    let mut result = Box::new(iter.next().expect("got an empty list of streams"))
        as Box<Stream<Item = T, Error = E>>;
    while let Some(next) = iter.next() {
        result = Box::new(result.select(next));
    }

    result
}

There's certainly a more efficient way to implement this, though. There is a select_all combinator for futures, but there isn't one yet for streams. Perhaps you could implement it yourself and submit it as a pull request!

Francis Gagné
  • 60,274
  • 7
  • 180
  • 155
  • It seems that there is a [`select_all`](https://docs.rs/futures/0.2/futures/stream/fn.select_all.html) for streams in futures 0.2 (not stable version). It seems to be implemented without any use of `Box`. – 0xcaff Nov 11 '18 at 18:48