1

I am trying to make a function that consumes Stream and truncate it when there are max_consecutive_fails consecutive fails. However, things didn't work well(E0495). I changed Streams to Iterators (and removed asyncs) and it simply worked. Why does this happen? How can I refactor this code (to work)?

use futures::stream::Stream;
pub fn max_fail<'a, T>(stream : impl Stream<Item = Option<T>> +'a , max_consecutive_fails: usize) -> impl Stream +'a where T : 'a
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| async {
        if x.is_some(){
            consecutive_fails = 0;
            true
        }
        else{
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        }
    })
}

The below one is the minimized example I tried to point out what the problem is, but I still wasn't able to understand the rustc error message.

use futures::stream::Stream;
pub fn minified_example<'a>(stream: impl Stream<Item = bool> + 'a) -> impl Stream + 'a
{
    use futures::stream::StreamExt;
    stream.take_while( |x| async { *x })
}
Lee SongUn
  • 55
  • 6
  • Please refrain from including "thanks" or similar in your question. See here for more explanation: https://meta.stackexchange.com/questions/2950/should-hi-thanks-taglines-and-salutations-be-removed-from-posts , and here: https://meta.stackoverflow.com/questions/288160/no-thanks-damn-it – PiRocks May 03 '20 at 17:39

2 Answers2

2

Async blocks (async { ... }) are similar to closures in the way they capture their environment. By default, every use of a variable from the other scope is by reference, which means the impl core::future::Future created by the block cannot outlive the variables it captures.

You need to move x into the block with async move { ... } (Just like with closures)

Plecra
  • 156
  • 3
0

So Future captures variable, and the compiler isn't smart enough to remove unnecessary captures, and what one should do is explicitly disentangle the captures with separate async block.

use futures::stream::Stream;
pub fn max_fail<'a, T>(
    stream: impl Stream<Item = Option<T>> + 'a,
    max_consecutive_fails: usize,
) -> impl Stream + 'a
where
    T: 'a,
{
    use futures::stream::StreamExt;
    let mut consecutive_fails = 0;
    stream.take_while(move |x| {
        let t = if x.is_some() {
            consecutive_fails = 0;
            true
        } else {
            consecutive_fails += 1;
            consecutive_fails != max_consecutive_fails
        };
        return async move { t };
    })
}
Lee SongUn
  • 55
  • 6