0

I have an object that produces a stream. I would like to use that stream by collecting its items. But, in my settings, I can only use that stream through a trait:

use futures::stream::Stream;
use futures::stream::{self, StreamExt};
use std::pin::Pin;

pub trait GetStream {
    fn get_stream(&self) -> Box<dyn Stream<Item = u32>>;
}

The streamer produces a stream that is NOT Unpin, (in my case, because I use unfold). I know that, in the following example, I could have used another method, such as repeat_with, and that produce a <dyn Box<Stream<...> + Unpin> and the case is closed... but no, in real life, I need that unfold, so my Stream is !Unpin:

pub struct Streamer {
    begin: u32,
    count: usize,
}

impl GetStream for Streamer {
    fn get_stream(&self) -> Box<dyn Stream<Item = u32>> {
        let stream = stream::unfold((self.begin, self.count), |(cur, count)| async move {
            if count == 0 {
                let next_state = (cur + 1, count - 1);
                let yielded = next_state.0;
                Some((yielded, next_state))
            } else {
                None
            }
        });

        Box::new(stream)
    }
}

On the user's side, I'd like to use collect on the Stream, but that does require the stream to be Unpin:

#[tokio::main]
async fn main() {
    let streamer = Streamer { begin: 4, count: 4 };
    let stream = streamer.get_stream();
    let stream = Pin::new(stream);
    let vs = stream.collect::<Vec<_>>().await;
    println!("vs: {:?}", vs);
}

Is there a way to get the unfold() on the producer side, the trait in the middle, and the collect() in the consumer side?

Things got a bit weird when I tried to use Pin<Box<dyn Stream>>, and in the function get_stream, instead of returning Box::new(stream), I would return stream.boxed()... It does compile, but it prints an empty stream!?

This is related to a previous question about impl stream cannot be unpinned, but that solution doesn't help my case because it won't let me pin something which is !Unpin.

eggyal
  • 122,705
  • 18
  • 212
  • 237
  • Removed the [tag:pinning] tag, which relates to the pinning of X509 certificates (nothing to do with pinning in Rust). – eggyal Jun 16 '21 at 14:50
  • The code does not compile at the line `let stream = Pin::new(stream)`. But if you remove that line, then the compiler complains that you can't call collect (no pun intended) on the stream, because the stream is `!Unpin`. – user3166886 Jun 16 '21 at 15:14
  • The stream returned by `unfold` can't be `Unpin` because environment captured by the closure argument could be referenced within the closure, and moving the resulting stream at an `await` point would leave those references dangling. In this case since the closure does not actually need to be asynchronous you could use `stream::iter` together with `std::iter::from_fn`. – eggyal Jun 16 '21 at 15:23
  • @eggyal, you are correct, as I tried to give a simple example, which does not justify the `Stream::unfold()` method. In my application, however, which deals with paginated searches in Elasticsearch, I do need that async environment, and that complicates things a bit. – user3166886 Jun 16 '21 at 15:27
  • If you know that either: (i) your closure does not hold references to captured environment across any await points; or (ii) `collect` does not move the stream... then you could workaround this with unsafe code. But I'd probably prefer to implement my own stream type instead of using unfold. – eggyal Jun 16 '21 at 15:36
  • Actually, maybe you can make use of [`poll_fn`](https://docs.rs/futures/0.3.15/futures/stream/fn.poll_fn.html) here? – eggyal Jun 16 '21 at 16:36

1 Answers1

1

Your example prints an empty array because of a logic error. You initialize the streamer with count: 4:

    let streamer = Streamer { begin: 4, count: 4 };

But later you return None (i.e. "end of stream") when count != 0:

if count == 0 { // this case is never "true"
    // skipped for brevity
} else { // count != 0 -> this is always "true", so your app always returns None
    None
}

The fix is to invert your if statement. Full working source

use std::pin::Pin;

use futures::stream::{self, StreamExt, BoxStream};
use futures::stream::Stream;

#[tokio::main]
async fn main() {
    let streamer = Streamer { begin: 4, count: 4 };
    let stream = streamer.get_stream();
    let vs = stream.collect::<Vec<_>>().await;
    println!("vs: {:?}", vs);
}

pub trait GetStream {
    fn get_stream(&self) -> Pin<Box<dyn Stream<Item=u32>>>;
}

pub struct Streamer {
    begin: u32,
    count: usize,
}

impl GetStream for Streamer {
    fn get_stream(&self) -> Pin<Box<dyn Stream<Item = u32>>> {
        let stream = stream::unfold((self.begin, self.count), |(cur, count)| async move {
            if count > 0 {
                let next_state = (cur + 1, count - 1);
                let yielded = next_state.0;
                Some((yielded, next_state))
            } else {
                None
            }
        });

        stream.boxed()
    }
}
Svetlin Zarev
  • 14,713
  • 4
  • 53
  • 82