0

How do I make a function that:

  1. listens to a stream,
  2. if the stream doesn't yield an item for 2 seconds, cancels the stream and terminates?

Test setup; it should terminate after 2 seconds of no stdin input:

use std::time::Duration;
use futures::future::select;
use futures::future::Either;
use futures::stream::{Stream, StreamExt};
use tokio::io::AsyncBufReadExt;
use tokio::time::Instant;
const TWO_SECONDS: Duration = Duration::from_millis(2_000);

#[tokio::main]
async fn main() {
    use tokio::io;
    let lines =
        tokio_stream::wrappers::LinesStream::new(io::BufReader::new(io::stdin()).lines()).fuse();
    terminate_after_2_seconds_of_no_items(lines).await;
}

Attempt #1: Doesn't work; when the timer expires the overall program still waits for 1 more line before terminating.

async fn terminate_after_2_seconds_of_no_items<S, T>(mut stream: S)
where
    S: Stream<Item = T> + Unpin,
{
    let mut timer = Box::pin(tokio::time::sleep(TWO_SECONDS));
    let mut next_fut = stream.next();
    loop {
        let res = select(&mut next_fut, &mut timer).await;
        match res {
            Either::Left(Some(_)) => {
                next_fut = stream.next();
                timer.as_mut().reset(Instant::now() + TWO_SECONDS);
                println!("received item, timer reset");
            },
            Either::Left(None) => {
                println!("stream ended, terminating");
                break;
            }
            Either::Right(_) => {
                println!("timer expired, too bad, quitting");
                break;
            }
        }
    }
}

Attempt #2: Same problem as Attempt #1.

async fn terminate_after_2_seconds_of_no_items_2<S, T>(mut stream: S)
where
    S: Stream<Item = T> + Unpin,
{
    let mut next_fut = stream.next();
    loop {
        let res = tokio::time::timeout(TWO_SECONDS, &mut next_fut).await;
        match res {
            Ok(Some(_)) => { 
                next_fut = stream.next();
                println!("received item, timer reset");
            },
            Ok(None) => {
                println!("stream ended, terminating");
                break;
            },
            Err(_) => {
                println!("timer expired, too bad, quitting");
                break;
            },
        }
    }
}

Attempt #3: Same problem as Attempt #1.

async fn terminate_after_2_seconds_of_no_items_3<S, T>(stream: S)
where
    S: Stream<Item = T> + Unpin,
{
    use tokio_stream::StreamExt;
    let timed_stream = stream.timeout(TWO_SECONDS);
    tokio::pin!(timed_stream);
    while let Ok(x) = timed_stream.try_next().await {
        match x {
            Some(_) => { 
                println!("received item, timer reset");
            },
            None => {
                println!("stream ended, terminating");
                break;
            },
        }
    }
    println!("timer expired, too bad, quitting");
}

I don't believe I can use futures::stream::select and make the timer into a stream, since FutureExt::into_stream() would consume the timer, but I need to reset the timer later.

Is this a fundamental problem with tokio::io::Lines, or am I getting something wrong?

virchau13
  • 1,175
  • 7
  • 19
  • 2
    You are probably running into the consequences of implementation details documented here: [`tokio::io::Stdin`](https://docs.rs/tokio/latest/tokio/io/struct.Stdin.html) *"For technical reasons, stdin is implemented by using an ordinary blocking read on a separate thread, and it is impossible to cancel that read. This can make shutdown of the runtime hang until the user presses enter."* – kmdreko Mar 09 '23 at 04:20

2 Answers2

2

I discovered the same thing @kmdreko says in their comment: tokio's stdin is secretly blocking:

This handle is best used for non-interactive uses, such as when a file is piped into the application. For technical reasons, stdin is implemented by using an ordinary blocking read on a separate thread, and it is impossible to cancel that read. This can make shutdown of the runtime hang until the user presses enter.

For interactive uses, it is recommended to spawn a thread dedicated to user input and use blocking IO directly in that thread.

I cleaned up your code (don't need tokio-stream if you're just turning it back into a Future) and moved to a manually created runtime so you can tell what's going on. And it's pretty unexpected. (playground)

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let lines = BufReader::new(stdin()).lines();

    rt.block_on(terminate_after_2_seconds_of_no_items(lines));

    println!("Waiting for runtime to end");

    drop(rt);

    println!("Runtime finished");
}

rt.block_on runs as expected. If you wait two seconds, the first println occurs immediately. Then you have to send a newline or close stdin, after which the second println occurs. The blocking part happens when tokio waits for its threadpool to finish executing the current futures.

Since this only happens when the runtime is dropped, and that usually means you are done with the entire program, there is a really simple solution: end the process.

fn main() {
    let rt = tokio::runtime::Runtime::new().unwrap();
    let lines = BufReader::new(stdin()).lines();

    rt.block_on(terminate_after_2_seconds_of_no_items(lines));

    println!("Waiting for runtime to end");

    std::process::exit(0);
}

Stopping a thread externally is surprisingly difficult. Likewise, there doesn't seem to be a way to read from stdin without blocking or going into raw mode (there's even a comment on that post with the same question as you). So for now, this is probably the best solution.

drewtato
  • 6,783
  • 1
  • 12
  • 17
1

@kmdreko and @drewtato are correct as to the root cause of this issue: all three functions work, it's just that Tokio's default stdin uses blocking reads and can't be cancelled ahead of time.

To me this is a very strange default, so I just wrote a quick nonblocking version (ironically mostly copied from the Tokio docs):

mod better {
    use std::{
        fs::File,
        io::{self, Read},
        os::fd::{FromRawFd, RawFd},
        pin::Pin,
        task::{Context, Poll},
    };
    use futures::ready;
    use tokio::io::{unix::AsyncFd, AsyncRead, ReadBuf};

    // Copied without modification from https://github.com/anowell/nonblock-rs/blob/7685f3060ce9b5dc242847706b541ae46f27340b/src/lib.rs#L179
    fn set_blocking(fd: RawFd, blocking: bool) -> io::Result<()> {
        use libc::{fcntl, F_GETFL, F_SETFL, O_NONBLOCK};
        let flags = unsafe { fcntl(fd, F_GETFL, 0) };
        if flags < 0 {
            return Err(io::Error::last_os_error());
        }

        let flags = if blocking {
            flags & !O_NONBLOCK
        } else {
            flags | O_NONBLOCK
        };
        let res = unsafe { fcntl(fd, F_SETFL, flags) };
        if res != 0 {
            return Err(io::Error::last_os_error());
        }

        Ok(())
    }

    pub struct Stdin {
        inner: AsyncFd<File>,
    }

    // Copied without modification from https://docs.rs/tokio/1.26.0/tokio/io/unix/struct.AsyncFd.html#examples
    impl AsyncRead for Stdin {
        fn poll_read(
            self: Pin<&mut Self>,
            cx: &mut Context<'_>,
            buf: &mut ReadBuf<'_>,
        ) -> Poll<io::Result<()>> {
            loop {
                let mut guard = ready!(self.inner.poll_read_ready(cx))?;

                let unfilled = buf.initialize_unfilled();
                match guard.try_io(|inner| inner.get_ref().read(unfilled)) {
                    Ok(Ok(len)) => {
                        buf.advance(len);
                        return Poll::Ready(Ok(()));
                    }
                    Ok(Err(err)) => return Poll::Ready(Err(err)),
                    Err(_would_block) => continue,
                }
            }
        }
    }

    pub fn stdin() -> Result<Stdin, std::io::Error> {
        let stdin_fd = unsafe { File::from_raw_fd(0) };
        set_blocking(0, false)?;
        Ok(Stdin {
            inner: AsyncFd::new(stdin_fd)?,
        })
    }
}

This works perfectly:

#[tokio::main]
async fn main() {
    use tokio::io;
    let stdin = better::stdin().expect("stdin initialization failed");
    let stdin = io::BufReader::new(stdin);
    let lines = tokio_stream::wrappers::LinesStream::new(stdin.lines());
    terminate_after_2_seconds_of_no_items(lines).await;
}
% cargo run -q
> inp
received item, timer reset
> input 2
received item, timer reset
timer expired, too bad, quitting

The main problem with this solution is that it sets stdin to nonblocking, so it will probably break any other code that reads from stdin. As long as you read stdin exclusively from the Stdin handle, though, it will be fine.

virchau13
  • 1,175
  • 7
  • 19