10

I have a collection of futures which I want to combine into a single future that gets them executed sequentially.

I looked into the futures_ordered function. It seems to return the results sequentially but the futures get executed concurrently.

I tried to fold the futures, combining them with and_then. However, that is tricky with the type system.

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = tasks.into_iter().fold(
    ok(()),                             // seed
    |acc, task| acc.and_then(|_| task), // accumulator
);

playground

This gives the following error:

error[E0308]: mismatched types
  --> src/main.rs:10:21
   |
10 |         |acc, task| acc.and_then(|_| task), // accumulator
   |                     ^^^^^^^^^^^^^^^^^^^^^^ expected struct `futures::FutureResult`, found struct `futures::AndThen`
   |
   = note: expected type `futures::FutureResult<_, _>`
              found type `futures::AndThen<futures::FutureResult<_, _>, futures::FutureResult<(), _>, [closure@src/main.rs:10:34: 10:42 task:_]>`

I'm probably approaching this wrong but I've run out of ideas.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Jaanus Varus
  • 3,508
  • 3
  • 31
  • 49
  • 3
    Posting as a comment rather than an answer as I don't know how to fix it off the top of my head: since you provide `ok(())` - which returns a `FutureResult` - as the initial value, `fold` expects you to return a `FutureResult` from each iteration of the closure. In other words, the inferred types are too specific. – Joe Clay Jan 03 '18 at 17:06
  • *a single future that gets them executed sequentially* — why would you want to do such a thing? Since your futures don't depend on each other, there's no obvious reason to introduce forced serialization. – Shepmaster Jan 03 '18 at 18:08

6 Answers6

6

Stream has a function buffered which allows you to limit how many futures are polled concurrently.

If you have a collection of futures, you can create a stream and use buffered like so:

let tasks = vec![future1, future2];
let stream = ::futures::stream::iter_ok(tasks);
let mut when_result_ready = stream.buffered(1);

when_result_ready will now be a Stream implementation which only polls one future at a time and moves to the next once each future completes.

Update

Based on comments and profiling it appears buffered has a large overhead so another solution is to convert each Future to a Stream and flatten them:

iter_ok(tasks).map(|f|f.into_stream()).flatten()

flatten states that "each individual stream will get exhausted before moving on to the next." meaning no Future will be polled before the previous one is completed. In my local profiling this seems to be ~80% faster than the buffered approach.


Both of my answers above result in a Stream of results where each source Future is polled sequentially and the results returned. What the asker actually asked for was just a single Future at the end and not the results of each source Future, if this is the case, the answer from Stefan may be more useful and prove to have better performance.

Lukazoid
  • 19,016
  • 3
  • 62
  • 85
  • Thanks! I especially like the granularity of being able to specify level of concurrency through buffer size. – Jaanus Varus Jan 03 '18 at 18:27
  • 2
    I think `buffered` has a high overhead (there should be some `Arc` objects in the middle passed to `with_notify` every time an inner future is polled). Boxing might be cheaper than `buffered(1)`. – Stefan Jan 04 '18 at 08:21
  • @Stefan I did a quick bench and the boxed variant is roughly 30% faster than the buffer solution with a lot low lower std deviation. Although I'm not confident that any of the crucial parts weren't stripped away by optimizations. https://play.rust-lang.org/?gist=d5dc4ef2eb5f0611848edc9f3829fa40&version=nightly – Jaanus Varus Jan 04 '18 at 15:40
  • 1
    Actually, `and_then(|f| f)` ([`Stream::and_then`](https://docs.rs/futures/0.1.17/futures/stream/trait.Stream.html#method.and_then)) should be "the same" as `buffered(1)`, just a lot faster. – Stefan Jan 05 '18 at 08:03
6

Combine iter_ok and Stream::for_each:

use futures::Stream;
use futures::future::ok;
use futures::stream::iter_ok;

let tasks = vec![ok(()), ok(()), ok(())];

let combined_task = iter_ok::<_, ()>(tasks).for_each(|f| f);

iter_ok produces a stream of the passed items, and never throws an error (that is why you sometimes need to fix the error type). The closure passed to for_each then returns a Future to be run for each item - here simply the items that were passed in.

for_each then drives each returned future to completion before moving to the next one, like you wanted. It will also abort with the first error it encounters, and requires the inner futures to return () on success.

for_each itself returns a Future that will either fail (like described above) or return () on completion.

test tests::bench_variant_buffered ... bench:      22,356 ns/iter (+/- 1,816)
test tests::bench_variant_boxed ...    bench:       8,575 ns/iter (+/- 1,042)
test tests::bench_variant_for_each ... bench:       4,070 ns/iter (+/- 531)
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Stefan
  • 5,304
  • 2
  • 25
  • 44
  • This should be accepted instead of my answer, I completely missed that the asker actually wanted a `Future` at the end. I wrote my answer based upon the OP wanting a `Stream`. – Lukazoid Jan 04 '18 at 20:32
  • Accepted @Stefan's answer for the most optimal solution in my scenario. Also big thanks to Lukazoid and Shepmaster - I learned a lot of about the futures crate. – Jaanus Varus Jan 05 '18 at 22:04
2

As mentioned in the comments, your types are too concrete.

You can envision the implementation of fold as doing something like this:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task = ok(()); // seed
combined_task = combined_task.and_then(|_| task0); 
combined_task = combined_task.and_then(|_| task1); 
combined_task = combined_task.and_then(|_| task2); 

The variable combined_task needs to be updated in place with a new value of the same type. Since we start with ok(()), that's the type each step needs to return. However, the return type of and_then is different; it's an AndThen. In fact, AndThen is a generic type containing the closure and the underlying future, so each step will produce a distinct type with potentially a different size:

  1. FutureResult<()>
  2. AndThen<FutureResult<()>, closure0>
  3. AndThen<AndThen<FutureResult<()>, closure0>, closure1>
  4. AndThen<AndThen<AndThen<FutureResult<()>, closure0>, closure1>, closure2>

Instead, you can create a unified type by producing a boxed trait object at each step:

let (task0, task1, task2) = (ok(()), ok(()), ok(()));

let mut combined_task: Box<Future<Item = (), Error = ()>> = Box::new(ok(())); // seed
combined_task = Box::new(combined_task.and_then(|_| task0)); 
combined_task = Box::new(combined_task.and_then(|_| task1)); 
combined_task = Box::new(combined_task.and_then(|_| task2)); 
  1. Box<Future<Item = (), Error = ()>>
  2. Box<Future<Item = (), Error = ()>>
  3. Box<Future<Item = (), Error = ()>>
  4. Box<Future<Item = (), Error = ()>>

Converting back to the fold syntax:

let combined_task: Box<Future<Item = (), Error = ()>> =
    tasks.into_iter().fold(Box::new(ok(())), |acc, task| {
        Box::new(acc.and_then(|_| task))
    });

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Thanks! I was thinking of this but prefer to avoid heap allocations / runtime dispatch if possible. Although I'm not sure how the other buffered stream approach is internally implemented. – Jaanus Varus Jan 03 '18 at 18:25
  • Hi, @Shepmaster , can you please explain more about why do add `Box` can fix this issue? I occurred same problem, if I specify function returns `impl Future`, program compiles failed with `expect futures::FutureResult, found futures::AndThen`, but when I changed return type to `Box>`, everything can be compiled. – Songday Jul 27 '18 at 03:35
  • @Songday I have a much longer explanation in another answer (now linked from this answer). Can you check that and see if it explains it adequately? If not, let me know what's missing and I'll try to update. – Shepmaster Jul 27 '18 at 13:05
  • Thank you @Shepmaster , I read your post several times but can't understand completely. The `Box`, my understanding is, everything which is unsized, needs `Box` wrapper. In my case, function returns `impl Future` trait that was unsized, so Rust refuses to compile, right? I still feeling I was misunderstanding something.... – Songday Jul 30 '18 at 03:42
  • @Songday not quite; unsized types don't really play into it here. The problem is that each step has a different type (which can have different sizes). `impl Future` is *not* unsized. The compiler knows the concrete type and size, although the programmer does not. – Shepmaster Jul 30 '18 at 14:47
2

In my case (stable async/await) this code was very helpful:

use futures::{stream, StreamExt};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error + 'static>> {
    let data = vec![1,2,3];

    stream::iter(data).for_each(|id| async move {
        let request = async { id }; // async io request
        let res = request.await;
        println!("res: {:?}", res);
        ()
    }).await;
    
    Ok(())
}

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=ad5feaf0cbb3597730c22df2eaf4a606

Kirill A. Khalitov
  • 1,225
  • 4
  • 16
  • 24
1

When I needed something like this (mainly because I was debugging an issue) I ended up writing a seq combinator composing loop_fn like so:

fn seq<I>(
    i: I,
) -> impl Future<Item = Vec<<I::Item as IntoFuture>::Item>, Error = <I::Item as IntoFuture>::Error>
where
    I: IntoIterator,
    I::Item: IntoFuture,
{
    let iter = i.into_iter();
    loop_fn((vec![], iter), |(mut output, mut iter)| {
        let fut = if let Some(next) = iter.next() {
            Either::A(next.into_future().map(|v| Some(v)))
        } else {
            Either::B(future::ok(None))
        };

        fut.and_then(move |val| {
            if let Some(val) = val {
                output.push(val);
                Ok(Loop::Continue((output, iter)))
            } else {
                Ok(Loop::Break(output))
            }
        })
    })
}
Raj
  • 2,557
  • 2
  • 19
  • 17
-1

An alternative

Cargo.toml

[dependencies]
futures = "0.3"
use futures::stream::{self, StreamExt};

async fn sequentially() {
    let tasks = vec![ok(()), ok(()), ok(())];

    stream::iter(tasks).for_each_concurrent(1, |task| async move {
        // async code here
    }).await;
}
Roman Rhrn Nesterov
  • 3,538
  • 1
  • 28
  • 16
  • This does not behave as requested - [`for_each_concurrent()`, as implied by its name, executes the future concurrently and not sequentially](https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.for_each_concurrent). You need `for_each()`, but there is already an answer saying that. – Chayim Friedman May 10 '23 at 22:15