First of all let me start that I'm still learning rust, and if this is an anti-pattern please lmk.
Problem Statement
I am re-writing some websocket subscribers in Rust. Since they are mainly network bounded I am heavily using async. However, some of the workers need to run some other background tasks to better interpret data coming over websocket.
As such my plan right now is to have a common function handling websocket subscription and general overhead (common_task
below), with worker only having to do specific processing of the message (common_task_step
below). In additional workers can return vector of futures (get_additional_tasks
), which will be awaited alongside the common function (inside run_worker
).
But, I'm getting the following error:
error[E0597]: `*worker` does not live long enough
--> src/main.rs:67:21
|
65 | async fn run_worker<'a>(worker: Box<dyn LongWorker<'a>>) {
| ------ lifetime `'1` appears in the type of `worker`
66 | // This complains about worker being dropped while borrowed
67 | let mut tasks = worker.get_additional_tasks().await;
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `*worker` is borrowed for `'1`
...
75 | }
| - `*worker` dropped here while still borrowed
For more information about this error, try `rustc --explain E0597`.
It confuses me, since tasks
are joined/awaited before worker
is dropped.
What I have tried
Box<dyn Future<Output = () + 'a>>
(LocalBoxFuture without the Pin) with the same issue.
I also tried Arc<dyn Future = ()>
as well as Arc<Box<dyn Future = ()>>
inspired by this SO post, but ended up with the same issue.
Code
use std::time::SystemTime;
use futures::{FutureExt, future::LocalBoxFuture}; // 0.3.27
use async_trait::async_trait; // 0.1.66
use tokio::time::{interval, sleep, Duration};
#[async_trait]
trait LongWorker<'a> {
async fn get_additional_tasks(&'a self) -> Vec<LocalBoxFuture<'a, ()>>;
async fn common_task_step(&'a self, some_arg: u64) -> ();
}
// Nothing is done on tasks
struct NoOpWorker {}
#[async_trait]
impl<'a> LongWorker<'a> for NoOpWorker {
async fn get_additional_tasks(&'a self) -> Vec<LocalBoxFuture<'a, ()>> {
vec![]
}
async fn common_task_step(&'a self, some_arg: u64) {
println!("Noop Got arg {}", some_arg);
}
}
// There are some tasks to be done
struct SleepWorker {}
impl SleepWorker {
async fn sleep(&self, millis: u64) -> () {
sleep(Duration::from_millis(millis)).await;
println!("Slept for {}ms", millis);
}
}
#[async_trait]
impl<'a> LongWorker<'a> for SleepWorker {
async fn get_additional_tasks(&'a self) -> Vec<LocalBoxFuture<'a, ()>> {
vec![
self.sleep(500).boxed_local(),
self.sleep(1500).boxed_local(),
self.sleep(2500).boxed_local(),
self.sleep(3500).boxed_local(),
]
}
async fn common_task_step(&'a self, some_arg: u64) {
println!("Sleeper Got arg {}", some_arg);
}
}
async fn common_task<'a>(worker: &'a dyn LongWorker<'a>) {
// IRL this would subscribe to websocket + handle shutdown signals etc.
let mut interval = interval(Duration::from_millis(100));
for _ in 1 .. 5 {
interval.tick().await;
worker.common_task_step(
SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64
).await;
}
}
async fn run_worker<'a>(worker: Box<dyn LongWorker<'a>>) {
// This complains about worker being dropped while borrowed
let mut tasks = worker.get_additional_tasks().await;
// This works
// let mut tasks = vec![];
tasks.push(common_task(worker.as_ref()).boxed_local());
// But future ends here (thus it is not borrowed anymore)
futures::future::join_all(tasks).await;
// And worker is dropped here
}
#[tokio::main(worker_threads = 5)]
async fn main() {
let noop_worker = Box::new(NoOpWorker {});
let sleep_worker = Box::new(SleepWorker {});
run_worker(noop_worker).await;
run_worker(sleep_worker).await;
}