-1

I have a struct with a member. The implementation returns an async stream which changes this member on a specific event (in the example on each).

This fails with a lifetime error, which is understandable, since the struct itself is not necessarily in the same lifetime as the map in the stream. it sounds similar to lifetime around async and stream, but I am not sure if it is related.

use async_std::pin::Pin;
use futures::{Stream, StreamExt};
use std::time::Duration;

struct StreamProvider {
    value: u16,
}

impl StreamProvider {
    fn returnastream(self: &mut Self) -> Pin<Box<dyn Stream<Item = i32>>> {
        return async_std::stream::interval(Duration::from_millis(1000))
            .map(|_| {
                // change a value of Self within the stream
                self.value = self.value + 1;
                1
            })
            .boxed();
    }
}

#[async_std::main]
async fn main() {
    let mut object = StreamProvider { value: 1 };

    let mut worx = object.returnastream();
    // subscribing to the items
    while let item = worx.next().await {
        match item {
            Some(value) => println!("{}", value),
            _ => {}
        }
    }
}
[dependencies]
futures = "0.3.6"
async-std = { version = "1.6.5", features = ["attributes", "unstable"] }

The Error message:

/Users/andre/.cargo/bin/cargo run --color=always --package traittest --bin traittest
   Compiling traittest v0.1.0 (/Users/andre/repos/traittest)
error[E0495]: cannot infer an appropriate lifetime for lifetime parameter `'a` due to conflicting requirements
  --> src/main.rs:20:12
   |
20 |         }).boxed();
   |            ^^^^^
   |
note: first, the lifetime cannot outlive the anonymous lifetime #1 defined on the method body at 14:3...
  --> src/main.rs:14:3
   |
14 | /   fn returnastream(self: &mut Self) -> Pin<Box<dyn Stream<Item=i32>>> {
15 | |     return async_std::stream::interval(Duration::from_millis(1000))
16 | |         .map(|_| {
17 | |           // change a value of Self within the stream
...  |
20 | |         }).boxed();
21 | |   }
   | |___^
note: ...so that the type `futures_util::stream::stream::map::Map<async_std::stream::interval::Interval, [closure@src/main.rs:16:14: 20:10 self:&mut &mut StreamProvider]>` will meet its required lifetime bounds
  --> src/main.rs:20:12
   |
20 |         }).boxed();
   |            ^^^^^
   = note: but, the lifetime must be valid for the static lifetime...
note: ...so that the expression is assignable
  --> src/main.rs:15:12
   |
15 |       return async_std::stream::interval(Duration::from_millis(1000))
   |  ____________^
16 | |         .map(|_| {
17 | |           // change a value of Self within the stream
18 | |           self.value = self.value + 1;
19 | |           1
20 | |         }).boxed();
   | |__________________^
   = note: expected  `std::pin::Pin<std::boxed::Box<(dyn futures_core::stream::Stream<Item = i32> + 'static)>>`
              found  `std::pin::Pin<std::boxed::Box<dyn futures_core::stream::Stream<Item = i32>>>`

error: aborting due to previous error

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
andre
  • 1,618
  • 2
  • 19
  • 38

1 Answers1

0

Thanks to Peter Hall:

You need to use synchonisation primitives, for example Arc and Mutex, in order to access and mutate values that are owned by another thread.

This was the link I missed. Here's the result:

use std::time::Duration;

use async_std::pin::Pin;
use futures::{Stream, StreamExt};
use std::sync::{Arc, Mutex};

struct StreamProvider {
    value: Arc<Mutex<u16>>,
}

impl StreamProvider {
    fn returnastream(self: &mut Self) -> Pin<Box<dyn Stream<Item = i32>>> {
        let v = self.value.clone();
        return async_std::stream::interval(Duration::from_millis(1000))
            .map(move |_| {
                let mut a = v.lock().unwrap();
                // change a value of Self within the stream
                *a += 1;
                1
            })
            .boxed();
    }
}

#[async_std::main]
async fn main() {
    let mut object = StreamProvider {
        value: Arc::new(Mutex::new(1)),
    };

    let mut worx = object.returnastream();
    // subscribing to the items
    while let item = worx.next().await {
        match item {
            Some(_) => println!("{}", object.value.lock().unwrap()),
            _ => {}
        }
    }
}
[dependencies]
futures = "0.3.6"
async-std = { version = "1.6.5", features = ["attributes", "unstable"] }
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
andre
  • 1,618
  • 2
  • 19
  • 38