1

If I want to combine multiple same-typed streams into one, I would use Stream::select:

let combined = first_stream.select(second_stream)

However, once one of the streams is exhausted, the other can still produce results for the combined stream. What can I use to exhaust the combined stream once either of the underlying streams is exhausted?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Shishkin Pavel
  • 351
  • 2
  • 19

1 Answers1

3

Write your own stream combinator:

use futures::{Async, Poll, Stream}; // 0.1.25

struct WhileBoth<S1, S2>(S1, S2)
where
    S1: Stream,
    S2: Stream<Item = S1::Item, Error = S1::Error>;

impl<S1, S2> Stream for WhileBoth<S1, S2>
where
    S1: Stream,
    S2: Stream<Item = S1::Item, Error = S1::Error>,
{
    type Item = S1::Item;
    type Error = S1::Error;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
        match self.0.poll() {
            // Return errors or ready values (including the `None`
            // that indicates the stream is empty) immediately.
            r @ Err(_) | r @ Ok(Async::Ready(_)) => r,
            // If the first stream is not ready, try the second one.
            Ok(Async::NotReady) => self.1.poll(),
        }
    }
}

See also:

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366