3

I have an iterator of futures::channel::mpsc::UnboundedReceiver<T>. I want to handle every answer of the receiver, only one at a time, but while also handling other futures.

This should be possible by looping over a futures::select!. But I need some way of getting a resolved value from the UnboundReceiver<T>. I've attempted to use futures::future::select_all(Iter), but this fails to compile with the error: futures::channel::mpsc::UnboundedReceiver<T> is not a future.

Playground example is here.

Florian sp1rit
  • 575
  • 1
  • 7
  • 20

1 Answers1

1

futures::channel::mpsc::UnboundedReceiver implements Stream but isn't a future, so you can create a SelectAll by calling futures::stream::select_all(recv) and then resolve to the next ready message by calling select_all.next(). I adapted your example by using it:

use futures::{channel::mpsc, stream::{self, StreamExt, select_all}}; // 0.3.8
use tokio; // 1.0.1

#[tokio::main]
async fn main() -> failure::Fallible<()> {
    let mut recv = Vec::new();
    let mut futures = stream::FuturesUnordered::new();
    for _i in 0..3 {
        let (tx, rx) = mpsc::unbounded();
        recv.push(rx);
        futures.push(tokio::spawn(async move {
            tokio::time::sleep(std::time::Duration::from_millis(3000)).await;
            tx.unbounded_send("Message").unwrap();
        }));
    }
    let mut select_all = select_all(recv);
    loop {
        futures::select! {
            msg = select_all.next() => {
                println!("{:#?}", msg);
            }
            _ = futures.select_next_some() => {
                eprintln!("Thread died");
            },
            complete => break
        }
    }
    Ok(())
}

Note that this is not multithreading but asynchronous programming, you spawn aynchronous tokio tasks instead of threads. I recommend reading the answer here: What is the difference between asynchronous programming and multithreading?

Yannick Funk
  • 1,319
  • 10
  • 23