1

I'm trying to implement a buffer with a single consumer and a single producer. I have only used POSIX Semaphores, however, they're not available in Rust and I'm trying to implement a trivial semaphore problem with Rust sync primitives (Mutex, Condvar, Barrier, ...) but I don't want to use channels.

My code behaves too irregularly, with some cases going well and other times it just stops at some number and in other cases it just doesn't start counting.

Things appear to work better if I wait 1 second in the main thread till I send the Condvar notification but it doesn't guarantee that it's not going to enter a deadlock.

How can this program be fixed? Am I understanding Condvars wrong?

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

struct Buffer {
    is_data: Mutex<bool>,
    is_data_cv: Condvar,
    is_space: Mutex<bool>,
    is_space_cv: Condvar,
    buffer: Mutex<i32>,
}

fn producer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_space = buffer
                .is_space_cv
                .wait(buffer.is_space.lock().unwrap())
                .unwrap();
            if *is_space {
                {
                    let mut hueco = buffer.buffer.lock().unwrap();
                    *hueco = i;
                }

                *is_space = false;
                {
                    let mut is_data = buffer.is_data.lock().unwrap();
                    *is_data = true;
                }
                buffer.is_data_cv.notify_one();
                break;
            }
        }
    }
}

fn consumer(buffer: Arc<Buffer>) {
    for i in 0..50 {
        loop {
            let mut is_data = buffer
                .is_data_cv
                .wait(buffer.is_data.lock().unwrap())
                .unwrap();
            if *is_data {
                {
                    let hueco = buffer.buffer.lock().unwrap();
                    println!("{}", *hueco);
                }
                *is_data = false;
                {
                    let mut is_space = buffer.is_space.lock().unwrap();
                    *is_space = true;
                }
                buffer.is_space_cv.notify_one();
                break;
            }
        }
    }
}

fn main() {
    let buffer = Arc::new(Buffer {
        is_data: Mutex::new(false),
        is_data_cv: Condvar::new(),
        is_space: Mutex::new(true),
        is_space_cv: Condvar::new(),
        buffer: Mutex::new(0),
    });
    let b = buffer.clone();
    let p = thread::spawn(move || {
        producer(b);
    });
    let b = buffer.clone();
    let c = thread::spawn(move || {
        consumer(b);
    });

    //thread::sleep_ms(1000);

    buffer.is_space_cv.notify_one();
    c.join();
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366

2 Answers2

6

I would encourage you to create smaller methods and reuse existing Rust types such as Option. This will allow you to simplify your code quite a bit — only one Mutex and one Condvar:

use std::thread;
use std::sync::{Arc, Condvar, Mutex};

#[derive(Debug, Default)]
struct Buffer {
    data: Mutex<Option<i32>>,
    data_cv: Condvar,
}

impl Buffer {
    fn insert(&self, val: i32) {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_some() {
            lock = self.data_cv.wait(lock).expect("Can't wait");
        }
        *lock = Some(val);
        self.data_cv.notify_one();
    }

    fn remove(&self) -> i32 {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_none() {
            lock = self.data_cv.wait(lock).expect("Can't wait");
        }
        let val = lock.take().unwrap();
        self.data_cv.notify_one();
        val
    }
}

fn producer(buffer: &Buffer) {
    for i in 0..50 {
        println!("p: {}", i);
        buffer.insert(i);
    }
}

fn consumer(buffer: &Buffer) {
    for _ in 0..50 {
        let val = buffer.remove();
        println!("c: {}", val);
    }
}

fn main() {
    let buffer = Arc::new(Buffer::default());

    let b = buffer.clone();
    let p = thread::spawn(move || {
        producer(&b);
    });

    let b = buffer.clone();
    let c = thread::spawn(move || {
        consumer(&b);
    });

    c.join().expect("Consumer had an error");
    p.join().expect("Producer had an error");
}

If you wanted to have a bit more performance (benchmark to see if it's worth it), you could have Condvars for the "empty" and "full" conditions separately:

#[derive(Debug, Default)]
struct Buffer {
    data: Mutex<Option<i32>>,
    is_empty: Condvar,
    is_full: Condvar,
}

impl Buffer {
    fn insert(&self, val: i32) {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_some() {
            lock = self.is_empty.wait(lock).expect("Can't wait");
        }
        *lock = Some(val);
        self.is_full.notify_one();
    }

    fn remove(&self) -> i32 {
        let mut lock = self.data.lock().expect("Can't lock");
        while lock.is_none() {
            lock = self.is_full.wait(lock).expect("Can't wait");
        }
        let val = lock.take().unwrap();
        self.is_empty.notify_one();
        val
    }
}
Shepmaster
  • 388,571
  • 95
  • 1,107
  • 1,366
  • Why only one `Condvar`? If you call `insert` twice, the second `insert` will get a wakeup for the `CondVar` notify from the first `insert`. I think two `CondVar`s are not obviously bad (maybe I'm missing something here). – Stefan Nov 29 '17 at 13:58
  • @Stefan mostly for simplicity. With only one `Condvar`, the space needed to reason about is much smaller. Added the alternative, look like what you want? – Shepmaster Nov 29 '17 at 14:07
  • Thanks for the fast answer. The code is also Rusty, which is something I need to improve – Adrián Arroyo Calle Nov 29 '17 at 14:27
  • Yup, looks nice. – Stefan Nov 29 '17 at 16:37
0

To improve the concurrency performance, you can add more slots in the buffer. The following example also supports multiple producers & consumers.

use std::sync::{Arc, Condvar, Mutex, MutexGuard};
use std::thread;

const MAX: usize = 10;

struct Buffer {
    inner: Mutex<BufferInner>,
    fill_cond: Condvar,
    empty_cond: Condvar,
}

impl Buffer {
    fn new() -> Self {
        Buffer {
            inner: Mutex::new(BufferInner {
                data: [Option::None; MAX],
                filled: 0,
                used: 0,
                count: 0,
            }),
            fill_cond: Condvar::new(),
            empty_cond: Condvar::new(),
        }
    }
}

struct BufferInner {
    data: [Option<i32>; MAX],
    filled: usize,
    used: usize,
    count: usize,
}

impl BufferInner {
    fn put(&mut self, value: i32) {
        self.data[self.filled] = Some(value);
        self.filled = (self.filled + 1) % MAX;
        self.count += 1;
    }

    fn get(&mut self) -> i32 {
        let tmp: Option<i32> = self.data[self.used];
        self.used = (self.used + 1) % MAX;
        self.count -= 1;
        tmp.unwrap()
    }
}

fn producer(buffer: &Buffer) {
    for i in 0..20 {
        let mut guard = buffer.inner.lock().unwrap();
        while guard.count == MAX {
            guard = buffer.empty_cond.wait(guard).unwrap();
        }

        guard.put(i);
        println!("producer: {}", i);
        buffer.fill_cond.notify_one();
    }
}

fn consumer(buffer: &Buffer) {
    for _ in 0..20 {
        let mut guard: MutexGuard<BufferInner> = buffer.inner.lock().unwrap();
        while guard.count == 0_usize {
            guard = buffer.fill_cond.wait(guard).unwrap();
        }

        let value = guard.get();
        println!("consumer: {}", value);
        buffer.empty_cond.notify_one();
    }
}

fn main() {
    let buffer = Arc::new(Buffer::new());
    let buffer1 = Arc::clone(&buffer);

    let p1 = thread::spawn(move || producer(&buffer));
    let c1 = thread::spawn(move || consumer(&buffer1));

    p1.join().unwrap();
    c1.join().unwrap();
}
Little Roys
  • 5,383
  • 3
  • 30
  • 28