5

I'm trying to use a Condvar to limit the number of threads that are active at any given time. I'm having a hard time finding good examples on how to use Condvar. So far I have:

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() {
    let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
    let mut i = 0;
    while i < 100 {
        let thread_count = thread_count_arc.clone();
        thread::spawn(move || {
            let &(ref num, ref cvar) = &*thread_count;
            {
                let mut start = num.lock().unwrap();
                if *start >= 20 {
                    cvar.wait(start);
                }
                *start += 1;
            }
            println!("hello");
            cvar.notify_one();
        });
        i += 1;
    }
}

The compiler error given is:

error[E0382]: use of moved value: `start`
  --> src/main.rs:16:18
   |
14 |                     cvar.wait(start);
   |                               ----- value moved here
15 |                 }
16 |                 *start += 1;
   |                  ^^^^^ value used here after move
   |
   = note: move occurs because `start` has type `std::sync::MutexGuard<'_, i32>`, which does not implement the `Copy` trait

I'm entirely unsure if my use of Condvar is correct. I tried staying as close as I could to the example on the Rust API. Wwat is the proper way to implement this?

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Dumbapples
  • 3,879
  • 3
  • 14
  • 10

4 Answers4

4

Here's a version that compiles:

use std::{
    sync::{Arc, Condvar, Mutex},
    thread,
};

fn main() {
    let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
    let mut i = 0;
    while i < 100 {
        let thread_count = thread_count_arc.clone();
        thread::spawn(move || {
            let (num, cvar) = &*thread_count;

            let mut start = cvar
                .wait_while(num.lock().unwrap(), |start| *start >= 20)
                .unwrap();

            // Before Rust 1.42, use this:
            //
            // let mut start = num.lock().unwrap();
            // while *start >= 20 {
            //     start = cvar.wait(start).unwrap()
            // }

            *start += 1;

            println!("hello");
            cvar.notify_one();
        });
        i += 1;
    }
}

The important part can be seen from the signature of Condvar::wait_while or Condvar::wait:

pub fn wait_while<'a, T, F>(
    &self,
    guard: MutexGuard<'a, T>,
    condition: F
) -> LockResult<MutexGuard<'a, T>>
where
    F: FnMut(&mut T) -> bool, 
pub fn wait<'a, T>(
    &self,
    guard: MutexGuard<'a, T>
) -> LockResult<MutexGuard<'a, T>>

This says that wait_while / wait consumes the guard, which is why you get the error you did - you no longer own start, so you can't call any methods on it!

These functions are doing a great job of reflecting how Condvars work - you give up the lock on the Mutex (represented by start) for a while, and when the function returns you get the lock again.

The fix is to give up the lock and then grab the lock guard return value from wait_while / wait. I've also switched from an if to a while, as encouraged by huon.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Ah I see. I have another question based on the output of this piece of code. It seems that "hello" is being printed between 20 and 30 times rather than the expected 100. Would this be because the main thread ends before all of the threads are finished? – Dumbapples Apr 26 '15 at 02:03
  • 1
    @Dumbapples yeah, that's why I said that this version "compiles" not that it "works" ^_^ I ran it once and saw some output, and ran it again and saw no output. The program will definitely exit when the main thread ends, and there's nothing keeping it around. There are lots of solutions for that though! – Shepmaster Apr 26 '15 at 02:05
  • I found a bug in my code. I think the current code will cause every thread after the 20th one to wait, regardless of how many threads are currently running. I would have to decrement the number in the arc somewhere. Would obtaining a lock on the Arc between the print statement and the notify and decrementing it solve this problem? – Dumbapples Apr 26 '15 at 02:17
  • I suspect the `else` branch of the `if` isn't necessary: `let mut start = num.lock().unwrap(); while *start => 20 { start = cvar.wait(start).unwrap(); }` (NB. I've also switched the `if` to a `while`, to protect against spurious wake-ups, which may occur with `Condvar`). – huon Apr 27 '15 at 02:02
1

For reference, the usual way to have a limited number of threads in a given scope is with a Semaphore.

Unfortunately, Semaphore was never stabilized, was deprecated in Rust 1.8 and was removed in Rust 1.9. There are crates available that add semaphores on top of other concurrency primitives.

let sema = Arc::new(Semaphore::new(20)); 

for i in 0..100 {
    let sema = sema.clone();
    thread::spawn(move || {
        let _guard = sema.acquire();
        println!("{}", i);
    })
}

This isn't quite doing the same thing: since each thread is not printing the total number of the threads inside the scope when that thread entered it.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
huon
  • 94,605
  • 21
  • 231
  • 225
  • 1
    Yeah, I really wanted to use a Semaphore, but the whole unstable thing. Even better would be a single-producer, multi-consumer channel and a 20-count threadpool! But the question did specifically ask about a Condvar, so we give the people what they want. ^_^ – Shepmaster Apr 27 '15 at 13:46
0

I realized the code I provided didn't do exactly what I wanted it to, so I'm putting this edit of Shepmaster's code here for future reference.

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

fn main() {
    let thread_count_arc = Arc::new((Mutex::new(0u8), Condvar::new()));
    let mut i = 0;
    while i < 150 {
        let thread_count = thread_count_arc.clone();
        thread::spawn(move || {
            let x;
            let &(ref num, ref cvar) = &*thread_count;
            {
                let start = num.lock().unwrap();
                let mut start = if *start >= 20 {
                    cvar.wait(start).unwrap()
                } else {
                    start
                };
                *start += 1;
                x = *start;
            }
            println!("{}", x);
            {
                let mut counter = num.lock().unwrap();
                *counter -= 1;
            }
            cvar.notify_one();
        });
        i += 1;
    }
    println!("done");
}

Running this in the playground should show more or less expected behavior.

Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
Dumbapples
  • 3,879
  • 3
  • 14
  • 10
-1

You want to use a while loop, and re-assign start at each iteration, like:

fn main() {
    let thread_count_arc = Arc::new((Mutex::new(0), Condvar::new()));
    let mut i = 0;
    while i < 100 {
        let thread_count = thread_count_arc.clone();
        thread::spawn(move || {
            let &(ref num, ref cvar) = &*thread_count;
            let mut start = num.lock().unwrap();
            while *start >= 20 {
                let current = cvar.wait(start).unwrap();
                start = current;
            }
            *start += 1;
            println!("hello");
            cvar.notify_one();
        });
        i += 1;
    }
}

See also some article on the topic:

https://medium.com/@polyglot_factotum/rust-concurrency-five-easy-pieces-871f1c62906a

https://medium.com/@polyglot_factotum/rust-concurrency-patterns-condvars-and-locks-e278f18db74f

gterzian
  • 535
  • 6
  • 5
  • Please [edit] your answer to explain the differences between it and the existing answers. Your code appears identical to the [existing accepted answer](https://stackoverflow.com/a/29872820/155423). – Shepmaster Apr 28 '20 at 12:31
  • Well, the the accepted answer [before I updated it](https://stackoverflow.com/revisions/29872820/4). – Shepmaster Apr 28 '20 at 12:42