1

I've got the following method:

pub fn load_names(&self, req: &super::MagicQueryType) -> ::grpcio::Result<::grpcio::ClientSStreamReceiver<String>> {

My goal is to get the very first element of grpcio::ClientSStreamReceiver; I don't care about the other names:

let name: String = load_names(query)?.wait().nth(0)?;

It seems inefficient to call wait() before nth(0) as I believe wait() blocks the stream until it receives all the elements.

How can I write a more efficient solution (i.e., nth(0).wait()) without triggering build errors? Rust's build errors for futures::stream::Stream look extremely confusing to me.

The Rust playground doesn't support grpcio = "0.4.4" so I cannot provide a link.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
James Larkin
  • 541
  • 5
  • 18

1 Answers1

4

To extract the first element of a futures::Stream in a blocking manner, you should convert the Stream to an iterator by calling executor::block_on_stream and then call Iterator::next.

use futures::{executor, stream, Stream}; // 0.3.4
use std::iter;

fn example() -> impl Stream<Item = i32> {
    stream::iter(iter::repeat(42))
}

fn main() {
    let v = executor::block_on_stream(example()).next();
    println!("{:?}", v);
}

If you are using Tokio, you can convert the Stream into a Future with StreamExt::into_future and annotate a function with #[tokio::main]:

use futures::{stream, Stream, StreamExt}; // 0.3.4
use std::iter;
use tokio; // 0.2.13

fn example() -> impl Stream<Item = i32> {
    stream::iter(iter::repeat(42))
}

#[tokio::main]
async fn just_one() -> Option<i32> {
    let (i, _stream) = example().into_future().await;
    i
}

fn main() {
    println!("{:?}", just_one());
}

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366