2

I have a Tokio application that should return when an error happens. This is implemented using one-shot channels shared between two tasks. When any of the tasks detect an error it signals the channel, which is received by the other task.

However even after the error-detecting task signals the channel the other task does not return -- the select! block simply doesn't realize that the channel is signalled. Full code:

#![recursion_limit = "256"]

extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_sync;

use std::io::Write;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::TcpStream;
use tokio_sync::oneshot;

use futures::select;

use futures::future::FutureExt;

#[derive(Debug)]
enum AppErr {
    CantConnect(std::io::Error),
}

fn main() {
    let executor = tokio::runtime::Runtime::new().unwrap();

    executor.spawn(async {
        match client_task().await {
            Ok(()) => {}
            Err(err) => {
                println!("Error: {:?}", err);
            }
        }
    });

    executor.shutdown_on_idle();
}

async fn client_task() -> Result<(), AppErr> {
    let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
    print!("Connecting... ");
    let _ = std::io::stdout().flush();
    let sock = TcpStream::connect(&addr)
        .await
        .map_err(AppErr::CantConnect)?;
    println!("Connected.");

    let (read, write) = sock.split();

    let (abort_in_task_snd, abort_in_task_rcv) = oneshot::channel();
    let (abort_out_task_snd, abort_out_task_rcv) = oneshot::channel();

    tokio::spawn(handle_incoming(read, abort_in_task_rcv, abort_out_task_snd));
    tokio::spawn(handle_outgoing(
        write,
        abort_out_task_rcv,
        abort_in_task_snd,
    ));

    Ok(())
}

async fn handle_incoming(
    mut conn: TcpStreamReadHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                // TODO match abort_ret {..}
                println!("abort signalled, handle_incoming returning");
                return;
            },
            bytes = conn.read(&mut read_buf).fuse() => {
                match bytes {
                    Err(io_err) => {
                        println!("IO error when reading input stream: {:?}", io_err);
                        println!("Aborting");
                        abort_out.send(()).unwrap();
                        return;
                    }
                    Ok(bytes) => {
                        if bytes == 0 {
                            println!("Connection closed from the other end. Aborting.");
                            abort_out.send(()).unwrap();
                            return;
                        }
                        println!("Read {} bytes: {:?}", bytes, &read_buf[0..bytes]);
                    }
                }
            }
        }
    }
}

async fn handle_outgoing(
    mut conn: TcpStreamWriteHalf,
    abort_in: oneshot::Receiver<()>,
    abort_out: oneshot::Sender<()>,
) {
    let mut stdin = tokio::io::stdin();
    let mut read_buf: [u8; 1024] = [0; 1024];
    let mut abort_in_fused = abort_in.fuse();

    loop {
        select! {
            abort_ret = abort_in_fused => {
                println!("Abort signalled, handle_outgoing returning");
                return;
            }
            input = stdin.read(&mut read_buf).fuse() => {
                match input {
                    Err(io_err) => {
                        println!("IO error when reading stdin: {:?}", io_err);
                        println!("Aborting");
                        abort_out.send(()).unwrap();
                        return;
                    }
                    Ok(bytes) => {
                        if bytes == 0 {
                            println!("stdin closed, aborting");
                            abort_out.send(()).unwrap();
                            return;
                        }
                        println!("handle_outgoing read {} bytes", bytes);
                        match conn.write_all(&read_buf[0..bytes]).await {
                            Ok(()) => {
                            },
                            Err(io_err) => {
                                println!("Error when sending: {:?}", io_err);
                                println!("Aborting");
                                abort_out.send(()).unwrap();
                                return;
                            }
                        }
                    }
                }
            },
        }
    }
}

Dependencies:

futures-preview = { version = "0.3.0-alpha.18",  features = ["async-await", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-net = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"

So when the connection is closed on the other side, or there's an error, I signal the channel:

println!("Connection closed from the other end. Aborting.");
abort_out.send(()).unwrap();
return;

But for some reason the other task never notices:

select! {
    // Never runs:
    abort_ret = abort_in_fused => {
        // TODO match abort_ret {..}
        println!("abort signalled, handle_incoming returning");
        return;
    },
    ...
}

This can be seen in two ways:

The print in abort_ret case never runs.

After the connection is closed on the other end the process prints Connection closed from the other end. Aborting. but it doesn't return. When I attach gdb I see this backtrace:

...
#14 0x000055f10391ab7e in tokio_executor::enter::Enter::block_on (self=0x7ffc167ba5a0, f=...) at /home/omer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.2.0-alpha.2/src/enter.rs:121
#15 0x000055f103863a28 in tokio::runtime::threadpool::Runtime::shutdown_on_idle (self=...) at /home/omer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.0-alpha.2/src/runtime/threadpool/mod.rs:219
#16 0x000055f10384c01b in chat0_client::main () at src/chat0_client.rs:36

So it's blocked in Tokio event loop.

In addition to a direct answer, I'm interested in pointers on how to debug this.

Thanks.

sinan
  • 6,809
  • 6
  • 38
  • 67
  • I don't have knowledge about debugging part but i have added **see also** section into my answer for your previous question because of this issue, in documentation they explicitly state relation between blocking annotated future with the combined futures. Please [check this](https://stackoverflow.com/a/56562541/1601571) – Ömer Erden Aug 26 '19 at 08:40
  • @ÖmerErden So `stdin.read(...).fuse()` blocks the thread? I don't understand -- that type (`Stdin`) is exported by Tokio so I'd expect it to just work with other Tokio/futures functions. It sucks that Tokio has so many footguns... – sinan Aug 26 '19 at 08:46
  • İt actually works but the problem in here stdin future never ends, so the combined future never gets polled, using other solutions instead of tokio stdin might be more flexible or implementing your own, you can check the link above in the comment, you may find some useful links – Ömer Erden Aug 26 '19 at 09:24

0 Answers0