1

I designed this simple fifo that stores a DequePolicy, which is simply something that can remove elements from the back if there are too much elements:

use super::deque_policy::DequePolicy;
use std::collections::VecDeque;
use std::sync::Arc;
use std::sync::Condvar;

pub struct BoundedFifo<T> {
    pub deque: VecDeque<T>,
    pub policy: Box<dyn DequePolicy<T>>,
    //pub condvar: Condvar
}

impl<T> BoundedFifo<T> {
    pub fn new(policy: Box<dyn DequePolicy<T>>) -> BoundedFifo<T> {
        BoundedFifo{
            deque: VecDeque::<T>::new(),
            policy: policy,
            //condvar: Condvar::new()
        }
    }
}

impl<T> BoundedFifo<T> {
    pub fn pop_front(&mut self) -> Option<T> {
        self.policy.before_pop_front(&self.deque);
        //self.condvar.notify_all();
        self.deque.pop_front()
    }
    pub fn push_back(&mut self, t: T) {
        self.deque.push_back(t);
        //self.condvar.notify_all();
        self.policy.after_push_back(&mut self.deque);
    }
}

I want to share this fifo between 2 threads, but there's a problem. I want one thread to be able to wait for elements to be present on the fifo. That's why I added the condvar that is commented right now.

I commented it because I quickly realized that to share the fifo between threads, I gotta put it inside an Arc<Mutex<>>. So in order to wait for condvar that is inside fifo: Arc<Mutex<BoundedFifo<u8>>> for example, I'd have to lock it on one thread: fifo.lock().unwrap().condvar.wait_timeout(...). This essentially prevents the other thread from writing so there would be no point in waiting.

I then had the idea to do:

type Fifo<T> = Arc<(Mutex<BoundedFifo<T>>, Condvar)>;

Now I can wait for a condvar, but I also have to remember of notifying this condvar whenever I call fifo: Fifo<T>'s push_back and pop_front. For example:

let pair = ...
let fifo = pair.0;
let condvar = pair.1;
fifo.push_back(0);
condvar.notify_all();

this is error prone as I could easily forget to call it. On C++ it'd let me wait for something inside the shared thing (even though this is unsafe).

I thought about creating macros push_back! and pop_front! that would push_back and pop_front together with calling notify_all on the condvar, but I don't find this very elegant.

I'm kinda new to Rust, so I'm asking for better solutions for this. Is it possible, for example, to implement push_back and pop_front for Fifo<T>? This way I avoid macros :slight_smile:. That's an idea.

Guerlando OCs
  • 1,886
  • 9
  • 61
  • 150
  • Why don't you turn `type Fifo` into a struct? Then you can implement push_back and pop_front for it. e.g. `struct Fifo(Arc<(Mutex>, Condvar)>)` – 8176135 Sep 28 '20 at 03:32
  • @Prime_Aqasix wouldn't I end up with the same problem? I think I'd have to wrap `Fifo` into an `Arc` to share it between threads – Guerlando OCs Sep 28 '20 at 03:57
  • Not really, just derive `Clone` on the `Fifo` type. See this: [Rust Playground](https://play.rust-lang.org/?version=stable&mode=debug&edition=2018&gist=0faf93f4bad4c882703c04a39482cf23). You also need to change `Box>` to `Box + Send>` to allow sending between threads. – 8176135 Sep 28 '20 at 04:23

2 Answers2

2

Have your threads pass around an Arc<BoundedFifo<T>>. It will use internal mutability and do all the mutex and condition stuff itself inside its public-facing methods, which only will need &self. All Condition, Mutex, and policy methods you will need only require immutable references to them.

And don't make the struct fields pub.

I don't understand what your policy object is doing.

pub struct BoundedFifo<T> {
    deque: Mutex<VecDeque<T>>,
    policy: Box<dyn DequePolicy<T>>,
    condition: Condition,
}

impl<T> BoundedFifo<T> {
    /// pops a value off the front, if one is available
    /// waits only long enough to get the lock
    pub fn pop_front_nonwaiting(&self) -> Option<T> {
        let lock = self.deque.lock().unwrap();
        (*lock).pop_front()
    }

    /// waits until an element is available, then pops it.
    pub fn pop_front_waiting(&self) -> T {

        let lock = self.deque.lock().unwrap();
        let lock = self.condition.wait_while(lock,
                                             |dq| dq.len() == 0));
        self.condition.notify_all();
        // this will be safe because we know len() != 0
        (*lock).pop_front().unwrap()
    }
    pub fn push_back(&self, t: T) {
        let lock = self.deque.lock().unwrap();
        (*lock).push_back(t);
        // overflow/discarding logic goes here
        self.condition.notify_all();
    }
}
NovaDenizen
  • 5,089
  • 14
  • 28
1

You can wrap almost any type inside a struct, and the new struct will share the same properties as the type it wraps (at least after deriving the appropriate traits).

You can then expose the methods you need from the underlying type.

In this case, instead of type Fifo<T> = Arc<(Mutex<BoundedFifo<T>>, Condvar)>;, we can create a struct Fifo<T>(Arc<(Mutex<BoundedFifo<T>>, Condvar)>);. And then implement the methods you want on the struct.

One thing to note will need to change Box<dyn DequePolicy<T>> to Box<dyn DequePolicy<T> + Send> to allow sharing between threads. See this question: Sending trait objects between threads in Rust for more information.

#[derive(Clone)]
struct FiFo<T>(Arc<(Mutex<BoundedFifo<T>>, Condvar)>);

impl<T> FiFo<T> {
    fn push_back(&self, item: T) {
        (self.0).0.lock().unwrap().push_back(item);
        (self.0).1.notify_all();
    }
}

fn main() {
    let mut bounded_fifo = BoundedFifo::new(Box::new(DequePolicyThing));

    let pair = FiFo(Arc::new((Mutex::new(bounded_fifo), Condvar::new())));

    let pair2 = pair.clone();

    std::thread::spawn(move || {
        pair2.push_back("abc".to_owned());
    });

    // The first .0 is to access the Arc inside FiFo, the second is for the Mutex.
    let mut bounded = (pair.0).0.lock().unwrap(); 

    bounded = (pair.0).1.wait(bounded).unwrap();
    println!("Notified {:?}", bounded.pop_front());
}

Rust Playground

8176135
  • 3,755
  • 3
  • 19
  • 42