3

I want to able to start a future running in the background, and not wait for it immediately in the parent function scope.

Something like a dynamic join_all where I can add new futures in a loop to a set, and then pass the set to another function which can .await the whole set (that is already running).

I want to be able to do something like this:

join_all(vec![
    log_arg(&c),
    log_arg(&c)
]).await;

But the issues are:

  • .await starts the future executing, but also waits for it at the current function.
    • How do I start the execution without waiting for it?
  • &c is not 'static
    • Seems to be a requirement for all of the Tokio API's that "start the future executing without waiting for the result in the current fn scope", e.g spawn_local
    • It is OK if all the futures are on a single thread.

Example:

https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=85aa3a517bd1906b285f5a5586d7fa6d

zino
  • 1,222
  • 2
  • 17
  • 47
  • You can always [poll](https://doc.rust-lang.org/1.54.0/std/future/trait.Future.html#tymethod.poll) the future manually to make it progress until its next `await`. – Jmb Oct 19 '21 at 06:32

2 Answers2

2

How do I start the execution without waiting for it?

spawn a task.

Seems to be a requirement for all of the Tokio API's that "start the future executing without waiting for the result in the current fn scope", e.g spawn_local

Well yes, since you're spawning a task it's possible that the task outlives whatever owns the item, resulting in a dangling reference, which is not allowed. In fact it's pretty much a guarantee when using spawn_local: it's going to spawn a task on the same thread (/scheduler), and that will not be able to run at all until the current task yields or terminates.

The alternative would be to use "scoped tasks" (which don't have to be immediately waited on, but have to eventually be joined). However support for structured concurrency (scoped tasks) in tokio have so far died on the vine. So there is no way for the Rust compiler to know that a task does not "escape" from the scope which intiialised it, therefore it has to assume it does, and thus that whatever the task captures should be able to outlive the current scope.

Masklinn
  • 34,759
  • 3
  • 38
  • 57
  • Thanks. To summarise, you are saying that there is absolutely no way to run a future without waiting for it (on the same thread, with args that are not 'static and not Send)? – zino Oct 18 '21 at 15:15
  • 1
    Not in Tokio at least. Other reactors may have better structured concurrency support and thus provide a "scoped spawn" where that would work (though it would restrict the sub-task as one which would have to end before its creator function). – Masklinn Oct 18 '21 at 15:35
0

Here's a wg-async scope proposal and here's tokio scoped tasks issue.

When I had a similar problem, I first investigated async-scoped (async version of crossbeam::scope, or rayon::scope). In the end, I instead decided to write a new type which spawns a task for each future (or, in my case a Stream<Item=String>) and when it is dropped, it ensures the tasks are finalized. That's essentially a poor-person implementation of the "scoped" idea.

// how long to wait to print remaining container logs
const DRAIN_LOGS_TIMEOUT: Duration = Duration::from_secs(5);

pub(crate) struct LogPrinter {
    tasks: Vec<JoinHandle<()>>,
}

impl Drop for LogPrinter {
    fn drop(&mut self) {
        for task in self.tasks.drain(..).rev() {
            println!("Finalizing LogPrinter");
            let result = futures::executor::block_on(
                tokio::time::timeout(DRAIN_LOGS_TIMEOUT, task));
            println!("Finalized LogPrinter {:?}", result);
        }
    }
}

impl LogPrinter {
    pub(crate) fn new() -> Self {
        Self {
            tasks: vec![],
        }
    }

    pub(crate) fn print(&mut self, log_stream: Fuse<(impl Stream<Item=String> + Unpin + Sized + Send + 'static)>) {
        let mut s = log_stream;
        let handle = tokio::spawn(async move {
            while select! {
                msg = s.next() => match msg {
                    Some(msg) => {
                        let msg: String = msg;
                        println!("{}", msg);
                        true
                    },
                    None => false
                },
            } {}
        });
        self.tasks.push(handle);
    }
}

I am using it like this

let mut log_printer: LogPrinter = LogPrinter::new();
let logs_skrouterd: impl Stream<Item=String> = stream_container_logs(&docker, "skrouterd", &skrouterd);

log_printer.print(logs_skrouterd.fuse());

The drop gets executed when log_printer goes out of scope.

user7610
  • 25,267
  • 15
  • 124
  • 150