1

First of all let me start that I'm still learning rust, and if this is an anti-pattern please lmk.

Problem Statement

I am re-writing some websocket subscribers in Rust. Since they are mainly network bounded I am heavily using async. However, some of the workers need to run some other background tasks to better interpret data coming over websocket.

As such my plan right now is to have a common function handling websocket subscription and general overhead (common_task below), with worker only having to do specific processing of the message (common_task_step below). In additional workers can return vector of futures (get_additional_tasks), which will be awaited alongside the common function (inside run_worker).

But, I'm getting the following error:

error[E0597]: `*worker` does not live long enough
  --> src/main.rs:67:21
   |
65 | async fn run_worker<'a>(worker: Box<dyn LongWorker<'a>>) {
   |                         ------ lifetime `'1` appears in the type of `worker`
66 |     // This complains about worker being dropped while borrowed
67 |     let mut tasks = worker.get_additional_tasks().await;
   |                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
   |                     |
   |                     borrowed value does not live long enough
   |                     argument requires that `*worker` is borrowed for `'1`
...
75 | }
   | - `*worker` dropped here while still borrowed

For more information about this error, try `rustc --explain E0597`.

It confuses me, since tasks are joined/awaited before worker is dropped.

What I have tried

Box<dyn Future<Output = () + 'a>> (LocalBoxFuture without the Pin) with the same issue.

I also tried Arc<dyn Future = ()> as well as Arc<Box<dyn Future = ()>> inspired by this SO post, but ended up with the same issue.

Code

Playground code link

use std::time::SystemTime;
use futures::{FutureExt, future::LocalBoxFuture}; // 0.3.27
use async_trait::async_trait; // 0.1.66
use tokio::time::{interval, sleep, Duration};


#[async_trait]
trait LongWorker<'a> {
    async fn get_additional_tasks(&'a self) -> Vec<LocalBoxFuture<'a, ()>>;
    
    async fn common_task_step(&'a self, some_arg: u64) -> ();
}

// Nothing is done on tasks
struct NoOpWorker {}

#[async_trait]
impl<'a> LongWorker<'a> for NoOpWorker {
    async fn get_additional_tasks(&'a self) -> Vec<LocalBoxFuture<'a, ()>> {
        vec![]
    }
    
    async fn common_task_step(&'a self, some_arg: u64) {
        println!("Noop Got arg {}", some_arg);
    }
}

// There are some tasks to be done
struct SleepWorker {}

impl SleepWorker {
    async fn sleep(&self, millis: u64) -> () {
        sleep(Duration::from_millis(millis)).await;
        println!("Slept for {}ms", millis);
    }
}

#[async_trait]
impl<'a> LongWorker<'a> for SleepWorker {
    async fn get_additional_tasks(&'a self) -> Vec<LocalBoxFuture<'a, ()>> {
        vec![
            self.sleep(500).boxed_local(),
            self.sleep(1500).boxed_local(),
            self.sleep(2500).boxed_local(),
            self.sleep(3500).boxed_local(),
        ]
    }
    
    async fn common_task_step(&'a self, some_arg: u64) {
        println!("Sleeper Got arg {}", some_arg);
    }
}


async fn common_task<'a>(worker: &'a dyn LongWorker<'a>) {
    // IRL this would subscribe to websocket + handle shutdown signals etc.

    let mut interval = interval(Duration::from_millis(100));
    for _ in 1 .. 5 {
        interval.tick().await;
        worker.common_task_step(
            SystemTime::UNIX_EPOCH.elapsed().unwrap().as_millis() as u64
        ).await;
    }
}

async fn run_worker<'a>(worker: Box<dyn LongWorker<'a>>) {
    // This complains about worker being dropped while borrowed
    let mut tasks = worker.get_additional_tasks().await;
    // This works
    // let mut tasks = vec![];
    tasks.push(common_task(worker.as_ref()).boxed_local());
    
    // But future ends here (thus it is not borrowed anymore)
    futures::future::join_all(tasks).await;
    // And worker is dropped here
}


#[tokio::main(worker_threads = 5)]
async fn main() {
    let noop_worker = Box::new(NoOpWorker {});
    let sleep_worker = Box::new(SleepWorker {});
    
    run_worker(noop_worker).await;
    
    run_worker(sleep_worker).await;
}
balbok
  • 394
  • 3
  • 14

1 Answers1

1

The problem is the lifetime argument in the trait LongWorker<'a>. This lifetime parameter is unnecessary, and causes things to need to exist for longer than they really have to, for no reason. Removing this lifetime parameter (and adjusting the rest of the code as appropriate) fixes the issue:

#[async_trait]
trait LongWorker {
    async fn get_additional_tasks(&self) -> Vec<LocalBoxFuture<'_, ()>>;
    
    async fn common_task_step(&self, some_arg: u64) -> ();
}

Whenever you run into "x dropped here while still borrowed" at the end of a function, that's a pretty good clue that your lifetimes are too restrictive.

cdhowie
  • 158,093
  • 24
  • 286
  • 300
  • This worked. Thank you! Just to make sure I understand, in this case does `&self` translate to `&'static self` or `&'_ self` (with '_ being the same lifetime as one in LocalBoxFuture)? This SO post: https://stackoverflow.com/questions/71446778/anonymous-lifetime-in-function-return-type suggests it's the second one, in which case I'm confused how that is different to my original implementation. – balbok Mar 24 '23 at 01:28
  • 1
    @balbok The primary difference is that the lifetime belongs to the _trait itself_ with `LongWorker<'a>` vs to the reference `&self` with `LongWorker`. `'_` in this case is used to request automatic lifetime via lifetime elision. It's the same as `async fn get_additional_tasks<'a>(&'a self) -> Vec>;`. – cdhowie Mar 24 '23 at 01:43