How do I make a function that:
- listens to a stream,
- if the stream doesn't yield an item for 2 seconds, cancels the stream and terminates?
Test setup; it should terminate after 2 seconds of no stdin input:
use std::time::Duration;
use futures::future::select;
use futures::future::Either;
use futures::stream::{Stream, StreamExt};
use tokio::io::AsyncBufReadExt;
use tokio::time::Instant;
const TWO_SECONDS: Duration = Duration::from_millis(2_000);
#[tokio::main]
async fn main() {
use tokio::io;
let lines =
tokio_stream::wrappers::LinesStream::new(io::BufReader::new(io::stdin()).lines()).fuse();
terminate_after_2_seconds_of_no_items(lines).await;
}
Attempt #1: Doesn't work; when the timer expires the overall program still waits for 1 more line before terminating.
async fn terminate_after_2_seconds_of_no_items<S, T>(mut stream: S)
where
S: Stream<Item = T> + Unpin,
{
let mut timer = Box::pin(tokio::time::sleep(TWO_SECONDS));
let mut next_fut = stream.next();
loop {
let res = select(&mut next_fut, &mut timer).await;
match res {
Either::Left(Some(_)) => {
next_fut = stream.next();
timer.as_mut().reset(Instant::now() + TWO_SECONDS);
println!("received item, timer reset");
},
Either::Left(None) => {
println!("stream ended, terminating");
break;
}
Either::Right(_) => {
println!("timer expired, too bad, quitting");
break;
}
}
}
}
Attempt #2: Same problem as Attempt #1.
async fn terminate_after_2_seconds_of_no_items_2<S, T>(mut stream: S)
where
S: Stream<Item = T> + Unpin,
{
let mut next_fut = stream.next();
loop {
let res = tokio::time::timeout(TWO_SECONDS, &mut next_fut).await;
match res {
Ok(Some(_)) => {
next_fut = stream.next();
println!("received item, timer reset");
},
Ok(None) => {
println!("stream ended, terminating");
break;
},
Err(_) => {
println!("timer expired, too bad, quitting");
break;
},
}
}
}
Attempt #3: Same problem as Attempt #1.
async fn terminate_after_2_seconds_of_no_items_3<S, T>(stream: S)
where
S: Stream<Item = T> + Unpin,
{
use tokio_stream::StreamExt;
let timed_stream = stream.timeout(TWO_SECONDS);
tokio::pin!(timed_stream);
while let Ok(x) = timed_stream.try_next().await {
match x {
Some(_) => {
println!("received item, timer reset");
},
None => {
println!("stream ended, terminating");
break;
},
}
}
println!("timer expired, too bad, quitting");
}
I don't believe I can use futures::stream::select
and make the timer into a stream, since FutureExt::into_stream()
would consume the timer, but I need to reset the timer later.
Is this a fundamental problem with tokio::io::Lines
, or am I getting something wrong?