4

I want to terminate reading from a tokio::io::lines stream. I merged it with a oneshot future and terminated it, but tokio::run was still working.

use futures::{sync::oneshot, *}; // 0.1.27
use std::{io::BufReader, time::Duration};
use tokio::prelude::*; // 0.1.21

fn main() {
    let (tx, rx) = oneshot::channel::<()>();
    let lines = tokio::io::lines(BufReader::new(tokio::io::stdin()));
    let lines = lines.for_each(|item| {
        println!("> {:?}", item);
        Ok(())
    });

    std::thread::spawn(move || {
        std::thread::sleep(Duration::from_millis(5000));
        println!("system shutting down");
        let _ = tx.send(());
    });

    let lines = lines.select2(rx);

    tokio::run(lines.map(|_| ()).map_err(|_| ()));
}

How can I stop reading from this?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
nkkr
  • 101
  • 5

2 Answers2

2

There's nothing wrong with your strategy, but it will only work with futures that don't execute a blocking operation via Tokio's blocking (the traditional kind of blocking should never be done inside a future).

You can test this by replacing the tokio::io::lines(..) future with a simple interval future:

let lines = Interval::new(Instant::now(), Duration::from_secs(1));

The problem is that tokio::io::Stdin internally uses tokio_threadpool::blocking .

When you use Tokio thread pool blocking (emphasis mine):

NB: The entire task that called blocking is blocked whenever the supplied closure blocks, even if you have used future combinators such as select - the other futures in this task will not make progress until the closure returns. If this is not desired, ensure that blocking runs in its own task (e.g. using futures::sync::oneshot::spawn).

Since this will block every other future in the combinator, your Receiver will not be able to get a signal from the Senderuntil the blocking ends.

Please see How can I read non-blocking from stdin? or you can use tokio-stdin-stdout, which creates a channel to consume data from stdin thread. It also has a line-by-line example.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Ömer Erden
  • 7,680
  • 5
  • 36
  • 45
1

Thank you for your comment and correcting my sentences.

I tried to stop this non-blocking Future and succeeded.

let lines = Interval::new(Instant::now(), Duration::from_secs(1));

My understating is that it would work for this case to wrap the blocking Future with tokio threadpool::blocking. I'll try it later.

Thank you very much.

nkkr
  • 101
  • 5
  • Actually the idea is Inside a `Future` wrap the blocking behavior with Tokio's `Blocking` otherwise don't use it inside Future. Only thing to know that closure that you wrap with Tokio's `Blocking ` is **permitted to block indefinitely**, in your case thats what `Stdin` does. – Ömer Erden Jun 13 '19 at 14:21
  • Thanks a lot!. Now that I understand my way of thinking, I will study more. – nkkr Jun 13 '19 at 22:28