I have an object that produces a stream. I would like to use that stream by collecting its items. But, in my settings, I can only use that stream through a trait:
use futures::stream::Stream;
use futures::stream::{self, StreamExt};
use std::pin::Pin;
pub trait GetStream {
fn get_stream(&self) -> Box<dyn Stream<Item = u32>>;
}
The streamer produces a stream that is NOT Unpin
, (in my case, because I use unfold). I know that, in the following example, I could have used another method, such as repeat_with
, and that produce a <dyn Box<Stream<...> + Unpin>
and the case is closed... but no, in real life, I need that unfold
, so my Stream is !Unpin
:
pub struct Streamer {
begin: u32,
count: usize,
}
impl GetStream for Streamer {
fn get_stream(&self) -> Box<dyn Stream<Item = u32>> {
let stream = stream::unfold((self.begin, self.count), |(cur, count)| async move {
if count == 0 {
let next_state = (cur + 1, count - 1);
let yielded = next_state.0;
Some((yielded, next_state))
} else {
None
}
});
Box::new(stream)
}
}
On the user's side, I'd like to use collect
on the Stream, but that does require the stream to be Unpin
:
#[tokio::main]
async fn main() {
let streamer = Streamer { begin: 4, count: 4 };
let stream = streamer.get_stream();
let stream = Pin::new(stream);
let vs = stream.collect::<Vec<_>>().await;
println!("vs: {:?}", vs);
}
Is there a way to get the unfold()
on the producer side, the trait in the middle, and the collect()
in the consumer side?
Things got a bit weird when I tried to use Pin<Box<dyn Stream>>
, and in the function get_stream
, instead of returning Box::new(stream)
, I would return stream.boxed()
... It does compile, but it prints an empty stream!?
This is related to a previous question about impl stream cannot be unpinned, but that solution doesn't help my case because it won't let me pin something which is !Unpin
.