I have a Tokio application that should return when an error happens. This is implemented using one-shot channels shared between two tasks. When any of the tasks detect an error it signals the channel, which is received by the other task.
However even after the error-detecting task signals the channel the other task
does not return -- the select!
block simply doesn't realize that the channel
is signalled. Full code:
#![recursion_limit = "256"]
extern crate futures;
extern crate tokio;
extern crate tokio_net;
extern crate tokio_sync;
use std::io::Write;
use std::net::SocketAddr;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::net::tcp::split::{TcpStreamReadHalf, TcpStreamWriteHalf};
use tokio::net::TcpStream;
use tokio_sync::oneshot;
use futures::select;
use futures::future::FutureExt;
#[derive(Debug)]
enum AppErr {
CantConnect(std::io::Error),
}
fn main() {
let executor = tokio::runtime::Runtime::new().unwrap();
executor.spawn(async {
match client_task().await {
Ok(()) => {}
Err(err) => {
println!("Error: {:?}", err);
}
}
});
executor.shutdown_on_idle();
}
async fn client_task() -> Result<(), AppErr> {
let addr: SocketAddr = "127.0.0.1:8080".parse().unwrap();
print!("Connecting... ");
let _ = std::io::stdout().flush();
let sock = TcpStream::connect(&addr)
.await
.map_err(AppErr::CantConnect)?;
println!("Connected.");
let (read, write) = sock.split();
let (abort_in_task_snd, abort_in_task_rcv) = oneshot::channel();
let (abort_out_task_snd, abort_out_task_rcv) = oneshot::channel();
tokio::spawn(handle_incoming(read, abort_in_task_rcv, abort_out_task_snd));
tokio::spawn(handle_outgoing(
write,
abort_out_task_rcv,
abort_in_task_snd,
));
Ok(())
}
async fn handle_incoming(
mut conn: TcpStreamReadHalf,
abort_in: oneshot::Receiver<()>,
abort_out: oneshot::Sender<()>,
) {
let mut read_buf: [u8; 1024] = [0; 1024];
let mut abort_in_fused = abort_in.fuse();
loop {
select! {
abort_ret = abort_in_fused => {
// TODO match abort_ret {..}
println!("abort signalled, handle_incoming returning");
return;
},
bytes = conn.read(&mut read_buf).fuse() => {
match bytes {
Err(io_err) => {
println!("IO error when reading input stream: {:?}", io_err);
println!("Aborting");
abort_out.send(()).unwrap();
return;
}
Ok(bytes) => {
if bytes == 0 {
println!("Connection closed from the other end. Aborting.");
abort_out.send(()).unwrap();
return;
}
println!("Read {} bytes: {:?}", bytes, &read_buf[0..bytes]);
}
}
}
}
}
}
async fn handle_outgoing(
mut conn: TcpStreamWriteHalf,
abort_in: oneshot::Receiver<()>,
abort_out: oneshot::Sender<()>,
) {
let mut stdin = tokio::io::stdin();
let mut read_buf: [u8; 1024] = [0; 1024];
let mut abort_in_fused = abort_in.fuse();
loop {
select! {
abort_ret = abort_in_fused => {
println!("Abort signalled, handle_outgoing returning");
return;
}
input = stdin.read(&mut read_buf).fuse() => {
match input {
Err(io_err) => {
println!("IO error when reading stdin: {:?}", io_err);
println!("Aborting");
abort_out.send(()).unwrap();
return;
}
Ok(bytes) => {
if bytes == 0 {
println!("stdin closed, aborting");
abort_out.send(()).unwrap();
return;
}
println!("handle_outgoing read {} bytes", bytes);
match conn.write_all(&read_buf[0..bytes]).await {
Ok(()) => {
},
Err(io_err) => {
println!("Error when sending: {:?}", io_err);
println!("Aborting");
abort_out.send(()).unwrap();
return;
}
}
}
}
},
}
}
}
Dependencies:
futures-preview = { version = "0.3.0-alpha.18", features = ["async-await", "nightly"] }
tokio = "0.2.0-alpha.2"
tokio-net = "0.2.0-alpha.2"
tokio-sync = "0.2.0-alpha.2"
So when the connection is closed on the other side, or there's an error, I signal the channel:
println!("Connection closed from the other end. Aborting.");
abort_out.send(()).unwrap();
return;
But for some reason the other task never notices:
select! {
// Never runs:
abort_ret = abort_in_fused => {
// TODO match abort_ret {..}
println!("abort signalled, handle_incoming returning");
return;
},
...
}
This can be seen in two ways:
The print in abort_ret
case never runs.
After the connection is closed on the other end the process prints Connection
closed from the other end. Aborting.
but it doesn't return. When I attach
gdb I see this backtrace:
...
#14 0x000055f10391ab7e in tokio_executor::enter::Enter::block_on (self=0x7ffc167ba5a0, f=...) at /home/omer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-executor-0.2.0-alpha.2/src/enter.rs:121
#15 0x000055f103863a28 in tokio::runtime::threadpool::Runtime::shutdown_on_idle (self=...) at /home/omer/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-0.2.0-alpha.2/src/runtime/threadpool/mod.rs:219
#16 0x000055f10384c01b in chat0_client::main () at src/chat0_client.rs:36
So it's blocked in Tokio event loop.
In addition to a direct answer, I'm interested in pointers on how to debug this.
Thanks.