1

I have an Arc<Mutex<Vec<i32>>>, my_data that holds some integers. I have a task that pushes integers (retrieved from an unbounded channel) and another task that gets the last integer from my_data, uses it and pushes a new integer back into my_data (and sends it over to a new channel).

The problem I'm having is: used of moved value: my_data. I cannot remove the move from when spawning a new task because of lifetimes. Each task needs to have ownership of the data.

The application will then spawn another task, obtain the lock of my_data in order to gain access to it, dump it to local disk and .drain() it after that.

use tokio::sync::Mutex;
use tokio::sync::mpsc;
use std::sync::Arc;

#[tokio::main]
async fn main() {
    let my_data: Vec<i32> = vec![];
    let my_data = Arc::new(Mutex::new(my_data));

    let (tx1, mut rx1) = mpsc::unbounded_channel();
    tokio::spawn(async move {
        tx1.send(10).unwrap();
    }).await.unwrap();
    let handle1 = tokio::spawn(async move {
        while let Some(x) = rx1.recv().await {
            let my_data_clone = Arc::clone(&my_data);
            let mut my_data_lock = my_data_clone.lock().await;
            my_data_lock.push(x);
        }
    });

    let (tx2, mut rx2) = mpsc::unbounded_channel();
    tokio::spawn(async move {
        tx2.send(3).unwrap();
    }).await.unwrap();
    let handle2 = tokio::spawn(async move {
        while let Some(x) = rx2.recv().await {
            let my_data_clone = Arc::clone(&my_data); // 
            let mut my_data_lock = my_data_clone.lock().await;
            // Want: get last value, do something to it, then push it
            let last_value = my_data_lock.last(); // 10
            let value_to_push = x * last_value.unwrap(); // 3 * 10
            my_data_lock.push(value_to_push); // my_data: [10, 30]
        }
    });

    let (_, _) = tokio::join!(handle1, handle2);
}
user270199
  • 905
  • 1
  • 6
  • 12

1 Answers1

2

Reason: Ownership of my_data is moved inside of the first task (handle1) before you produce a clone for the second task. All of your current Arc::clone calls are useless.

Solution: Easiest way to solve is to produce a usable clone of the data before the first task starts executing. Code below only has one Arc::clone call and it's in the main function.

    ...

    // ADD THIS LINE:
    let my_data_clone = Arc::clone(&my_data);

    let handle1 = tokio::spawn(async move {
        while let Some(x) = rx1.recv().await {
            // REMOVE THIS: let my_data_clone = Arc::clone(&my_data);
            let mut my_data_lock = my_data.lock().await;
            my_data_lock.push(x);
        }
    });

    ...

    let handle2 = tokio::spawn(async move {
        while let Some(x) = rx2.recv().await {
            // REMOVE THIS: let my_data_clone = Arc::clone(&my_data); 
            let mut my_data_lock = my_data_clone.lock().await;
            let last_value = my_data_lock.last(); // 10
            let value_to_push = x * last_value.unwrap(); // 3 * 10
            my_data_lock.push(value_to_push); // my_data: [10, 30]
        }
    });

    ...
stepan
  • 1,043
  • 2
  • 8
  • 12
  • Thanks. If I have multiple tasks where I need to modify `my_data`, then I need to create a clone for each individual task. Is there a simpler way to achieve this? This question is probably too general to have a specific answer. I know this is unrelated to the original question I had (which I had already accepted your answer). – user270199 Feb 20 '22 at 13:56
  • 1
    @user270199 Arc simply makes sure that `my_data` stays alive (does not get dropped) as long as someone is using it (holds an Arc instance). One other way to ensure `my_data` exists long enough is to make it a global var: https://stackoverflow.com/questions/27791532/how-do-i-create-a-global-mutable-singleton – stepan Feb 20 '22 at 16:51