17

Editor's note: This question is from a version of Rust prior to 1.0 and uses terms and functions that do not exist in Rust 1.0 code. The concepts expressed are still relevant.

I need to read data provided by an external process via a POSIX file descriptor in my Rust program. The file descriptor connection is kept up a very long time (hours) and the other side passes data to me from time to time. I need to read and process the data stream continuously.

To do so, I wrote a loop that calls libc::read() (readv actually) to read the data and processes it when received. Since this would block the whole scheduler, I'm spawning a task on a new scheduler (task::spawn_sched(SingleThreaded)). This works fine as long as it runs, but I can't find a way to cleanly shut down the loop.

Since the loop is blocking most of the time, I can't use a port/channel to notify the loop to exit.

I tried to kill the loop task by taking it down using a failing linked task (spawn the loop task supervised, spawn a linked task within it and wait for a signal on a port to happen before fail!()ing and taking down the loop task with it). It works well in tests, but the libc::read() isn't interrupted (the task doesn't fail before read finishes and it hits task::yield() at some time.

I learned a lot looking at libcore sources, but I can't seem to find a proper solution.

  1. Is there a way to kill a (child) task in Rust even if it's doing some long external function call like a blocking read?
  2. Is there a way to do non-blocking reads on a POSIX file descriptor so that Rust keeps control over the task?
  3. How can I react to signals, e.g. SIGTERMif the user terminates my program? There doesn't seem to be something like sigaction() in Rust yet.
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Zargony
  • 9,615
  • 3
  • 44
  • 44
  • Seems like this isn't possible currently, but there's ongoing effort to improve async I/O: https://github.com/mozilla/rust/issues/4419 – Zargony Jun 07 '13 at 20:10
  • Are there any updates on non-blocking I/O now in Rust 1.0? – jocull Jun 04 '15 at 16:11
  • This comment is a lot later, but since the question doesn't have an accepted answer I'm putting it here: the io story has changed quite a bit since 1.0. Currently there is a great focus on async i/o using mio, based on the kernel libs for linux and windows (can't remember them off-hand). There may be some syntax-level functionality soon. More generally, if a worker thread has a work loop, this loop can check for a shutdown message. If it's blocked on i/o, I don't know if it's possible to wake it. I'd be interested to learn more about this. – derekdreery Mar 04 '17 at 13:47

2 Answers2

1
  1. According to mozila, killing a task is no more possible, for now, let alone blocking read.
  2. It will be possible to do so after mozilla/rust/pull/11410, see also my other issue report for rust-zmq erickt/rust-zmq/issues/24 which also depends on this. (sorry about the links)
  3. Maybe the signal listener will work for you.
Rashad
  • 11,057
  • 4
  • 45
  • 73
Fantix King
  • 1,414
  • 1
  • 14
  • 13
0

Is there a way to kill a (child) task in Rust even if it's doing some long external function call like a blocking read?

No.

See also:

Is there a way to do non-blocking reads [...] so that Rust keeps control over the task?

Yes.

See also:

on a POSIX file descriptor

Yes.

See also:

How can I react to signals

Decide your desired platform support, then pick an appropriate crate.

See also:

Putting it all together

use future::Either;
use signal_hook::iterator::Signals;
use std::os::unix::io::FromRawFd;
use tokio::{fs::File, io, prelude::*};

type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;

fn main() -> Result<()> {
    let signals = Signals::new(&[signal_hook::SIGUSR1])?;
    let signals = signals.into_async()?;

    let input = unsafe { std::fs::File::from_raw_fd(5) };
    let input = File::from_std(input);
    let lines = io::lines(std::io::BufReader::new(input));

    let signals = signals.map(Either::A);
    let lines = lines.map(Either::B);

    let combined = signals.select(lines);

    tokio::run({
        combined
            .map_err(|e| panic!("Early error: {}", e))
            .for_each(|v| match v {
                Either::A(signal) => {
                    println!("Got signal: {:?}", signal);
                    Err(())
                }
                Either::B(data) => {
                    println!("Got data: {:?}", data);
                    Ok(())
                }
            })
    });

    Ok(())
}

Cargo.toml

[package]
name = "future_example"
version = "0.1.0"
authors = ["An Devloper <an.devloper@example.com>"]
edition = "2018"

[dependencies]
tokio = "0.1.22"
signal-hook = { version = "0.1.9", features = ["tokio-support"] }

shim.sh

#!/bin/bash

set -eu

exec 5< /tmp/testpipe
exec ./target/debug/future_example

Execution

cargo build
mkfifo /tmp/testpipe
./shim.sh

Another terminal

printf 'hello\nthere\nworld' > /tmp/testpipe
kill -s usr1 $PID_OF_THE_PROCESS
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366