1

I want to build a single-producer multiple-consumer example in Rust, where the producer is bounded to have no more than 10 outstanding items. I modeled a solution in C that uses a a mutex and two condvars. One condvar is to wait the consumers when there is nothing to consume and one condvar is to wait for the producer when the unconsumed items count is greater than say 10. The C code is below.

As I understand it from the Rust docs, there must be a 1-1 connection between std::sync::Mutex and a std::sync::Condvar so I can't make an exact translation of my C solution.

Is there some other way to achieve the same end (that I cannot see) in Rust using std::sync::Mutex and std::sync::Condvar.

#define _GNU_SOURCE
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>

//
// This is a simple example of using a mutex and 2 condition variables to
// sync a single writer and multiple readers interacting with a bounded (fixed max size) queue
//
// in this toy example a queue is simulated by an int counter n_resource
//

int n_resource;
pthread_cond_t rdr_cvar;
pthread_cond_t wrtr_cvar;
pthread_mutex_t mutex;

void reader(void* data)
{
    long id = (long)data;
    for(;;) {

        pthread_mutex_lock(&mutex);
        while (n_resource <= 0) {
            pthread_cond_wait(&rdr_cvar, &mutex);
        }
        printf("Reader %ld n_resource = %d\n", id, n_resource);
        --n_resource;
        // if there are still things to read - singla one reader
        if(n_resource > 0) {
            pthread_cond_signal(&rdr_cvar);
        }
        // if there is space for the writer to add another signal the writer
        if(n_resource < 10) {
            pthread_cond_signal(&wrtr_cvar);
        }
        pthread_mutex_unlock(&mutex);
    }
}
void writer(void* data)
{
    for(;;) {

        pthread_mutex_lock(&mutex);
        printf("Writer before while n_resource %d \n", n_resource);
        while (n_resource > 10) {
            pthread_cond_wait(&wrtr_cvar, &mutex);
        }
        printf("Writer after while n_resource %d \n", n_resource);

        ++n_resource;
        // if there is something for a reader to read signal one of the readers.
        if(n_resource > 0) {
            pthread_cond_signal(&rdr_cvar);
        }
        pthread_mutex_unlock(&mutex);
    }
}

int main()
{
    pthread_t rdr_thread_1;
    pthread_t rdr_thread_2;
    pthread_t wrtr_thread;
    pthread_mutex_init(&mutex, NULL);
    pthread_cond_init(&rdr_cvar, NULL);
    pthread_cond_init(&wrtr_cvar, NULL);
    pthread_create(&rdr_thread_1, NULL, &reader, (void*)1L);
    pthread_create(&rdr_thread_2, NULL, &reader, (void*)2L);
    pthread_create(&wrtr_thread, NULL, &writer, NULL);
    pthread_join(wrtr_thread, NULL);
    pthread_join(rdr_thread_1, NULL);
    pthread_join(rdr_thread_2, NULL);
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
roblackwell
  • 51
  • 1
  • 6
  • It looks like your question might be answered by the answers of [Buffer in Rust with Mutex and Condvar](https://stackoverflow.com/a/47554383/155423) which shows using two `Condvar`s with one `Mutex`. If not, please **[edit]** your question to explain the differences. Otherwise, we can mark this question as already answered. – Shepmaster Sep 30 '20 at 00:35

1 Answers1

0

While a CondVar needs to be associated with only one Mutex, it is not necessary that a Mutex is associated with only one CondVar.

For example, the following code seems to work just fine - you can run it on the playground.

use std::sync::{Arc, Condvar, Mutex};
use std::thread;

struct Q {
    rdr_cvar: Condvar,
    wrtr_cvar: Condvar,
    mutex: Mutex<i32>,
}

impl Q {
    pub fn new() -> Q {
        Q {
            rdr_cvar: Condvar::new(),
            wrtr_cvar: Condvar::new(),
            mutex: Mutex::new(0),
        }
    }
}

fn writer(id: i32, qq: Arc<Q>) {
    let q = &*qq;
    for i in 0..10 {
        let guard = q.mutex.lock().unwrap();
        let mut guard = q.wrtr_cvar.wait_while(guard, |n| *n > 3).unwrap();

        println!("{}: Writer {} n_resource = {}\n", i, id, *guard);
        *guard += 1;

        if *guard > 0 {
            q.rdr_cvar.notify_one();
        }
        if *guard < 10 {
            q.wrtr_cvar.notify_one();
        }
    }
}

fn reader(id: i32, qq: Arc<Q>) {
    let q = &*qq;
    for i in 0..10 {
        let guard = q.mutex.lock().unwrap();
        let mut guard = q.rdr_cvar.wait_while(guard, |n| *n <= 0).unwrap();

        println!("{} Reader {} n_resource = {}\n", i, id, *guard);
        *guard -= 1;

        if *guard > 0 {
            q.rdr_cvar.notify_one();
        }
        if *guard < 10 {
            q.wrtr_cvar.notify_one();
        }
    }
}

fn main() {
    let data = Arc::new(Q::new());
    let data2 = data.clone();

    let t1 = thread::spawn(move || writer(0, data2));
    let t2 = thread::spawn(move || reader(1, data));

    t1.join().unwrap();
    t2.join().unwrap();
}
Michael Anderson
  • 70,661
  • 7
  • 134
  • 187
  • Does that mean you concur with the duplicate? – Shepmaster Sep 30 '20 at 00:37
  • I think the duplicate does address the surrounding issue --- implementing a producer-consumer queue --- but doesn't directly address the misunderstanding at the heart of this question, which is "As I understand it from the Rust docs, there must be a 1-1 connection between std::sync::Mutex and a std::sync::Condvar so I can't make an exact translation of my C solution." - which my first sentence points out is incorrect. – Michael Anderson Sep 30 '20 at 00:44