3

I do a server update in rust. it create patches between 2 binaries files, and serves static files

I try to do

    let mut update_state;
    if let Some(state) = update_stream.next().await {
        if let Ok(state) = state {
            update_state = state
        } else if let Err(err) = state {
            reply = BuildOutput { error: "Update failed: ".to_string() + &err.to_string() }
        }
    } else {
        reply = BuildOutput { error: "Unreacheable".to_string() }
    }

    let state = update_state.borrow();
    let progress = state.histogram.progress();

    let res = update_stream.try_for_each(|_state| future::ready(Ok(()))).await;

but get

note: future is not `Send` as this value is used across an await
   --> server\grpc\src\rpc.rs:260:50
    |
259 |         let mut update_state;
    |             ---------------- has type `SharedUpdateProgress` which is not `Send`
260 |         if let Some(state) = update_stream.next().await {
    |                                                  ^^^^^^ await occurs here, with `mut update_state` maybe used later
...
305 |     }
    |     - `mut update_state` is later dropped here
    = note: required for the cast from `impl futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>>` to the object type `dyn futures::Future<Output = Result<tonic::Response<BuildOutput>, Status>> + std::marker::Send

SharedUpdateProgress:

#[derive(Clone)]
pub struct SharedUpdateProgress {
    state: Rc<RefCell<UpdateProgress>>,
}

impl SharedUpdateProgress {
    pub fn new(target_revision: CleanName) -> Self {
        Self { state: Rc::new(RefCell::new(UpdateProgress::new(target_revision))) }
    }

    pub fn borrow(&self) -> Ref<'_, UpdateProgress> {
        self.state.borrow()
    }

    pub(crate) fn borrow_mut(&self) -> RefMut<'_, UpdateProgress> {
        self.state.borrow_mut()
    }
}

I don't know why and don't know how to fix it

Vana
  • 753
  • 3
  • 11
  • 20

1 Answers1

3

I assume a minimal reproducible example of your problem is as follows:

use std::{cell::RefCell, rc::Rc};
use tokio::time::{sleep, Duration};

#[derive(Clone)]
pub struct SharedString {
    state: Rc<RefCell<String>>,
}

impl SharedString {
    pub fn new(initial: &str) -> Self {
        Self {
            state: Rc::new(RefCell::new(initial.into())),
        }
    }
}

async fn run() {
    let shared_string = SharedString::new("Hello,");
    sleep(Duration::from_millis(1)).await;
    *shared_string.state.borrow_mut() += " world!";
    sleep(Duration::from_millis(1)).await;
    println!("{:?}", shared_string.state.borrow());
}

#[tokio::main]
async fn main() {
    tokio::task::spawn(run()).await.unwrap();
}
error: future cannot be sent between threads safely
   --> src/main.rs:27:24
    |
27  |     tokio::task::spawn(run()).await.unwrap();
    |                        ^^^^^ future returned by `run` is not `Send`
    |
    = help: within `impl Future<Output = ()>`, the trait `Send` is not implemented for `Rc<RefCell<String>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:19:36
    |
18  |     let shared_string = SharedString::new("Hello,");
    |         ------------- has type `SharedString` which is not `Send`
19  |     sleep(Duration::from_millis(1)).await;
    |                                    ^^^^^^ await occurs here, with `shared_string` maybe used later
...
23  | }
    | - `shared_string` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/martin/.cargo/git/checkouts/tokio-dd4afa005f1f4b79/686577b/tokio/src/task/spawn.rs:163:21
    |
163 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

The tokio Runtime is usually multi-threaded, meaning that at any .await point your task could get moved from one thread to another. That's why everything that is held across an .await point must be Send. Which Rc<RefCell<>> is explicitely not, because it's a single-threaded reference counter.

Solution: Replace Rc<RefCell<>> with Arc<Mutex<>>, which is the thread-safe equivalent.

use std::sync::{Arc, Mutex};
use tokio::time::{sleep, Duration};

#[derive(Clone)]
pub struct SharedString {
    state: Arc<Mutex<String>>,
}

impl SharedString {
    pub fn new(initial: &str) -> Self {
        Self {
            state: Arc::new(Mutex::new(initial.into())),
        }
    }
}

async fn run() {
    let shared_string = SharedString::new("Hello,");
    sleep(Duration::from_millis(1)).await;
    *shared_string.state.lock().unwrap() += " world!";
    sleep(Duration::from_millis(1)).await;
    println!("{:?}", shared_string.state.lock().unwrap());
}

#[tokio::main]
async fn main() {
    tokio::task::spawn(run()).await.unwrap();
}
"Hello, world!"
Finomnis
  • 18,094
  • 1
  • 20
  • 27
  • I replace by `Arc` and still get issue. Full error message: ```let state = update_state.lock(); | ----- has type `std::sync::MutexGuard<'_, UpdateProgress>` which is not `Send` ``` – Vana Jan 11 '23 at 11:35
  • 2
    You can't hold the lock while you call `.await`. You have to release it before. If you want to hold if during an `.await`, you have to use `tokio`'s mutex. It's slower, though. – Finomnis Jan 11 '23 at 12:13
  • If I use tokio Mutex, I have to use an async closure to call it and async closures are unstable :s – Vana Jan 12 '23 at 11:53
  • @Vana Yes and no; async closures are in fact unstable, but closures with an `async` block inside are not: https://play.rust-lang.org/?version=stable&mode=debug&edition=2021&gist=e674b0b55f8bd4b377dfea65f3494074 And I personally didn't understand the use of async closures yet, so far async blocks inside of normal closures was sufficient for all of my problems. Including whatever you are trying to do with a mutex. – Finomnis Jan 16 '23 at 22:20
  • Yes i will use an async body into my closure instead of an async closure – Vana Jan 17 '23 at 09:32