1

I have the following code that is using a bounded channel with capacity (20) less than the total amount of data (32) that I want to send via a crossbeam channel. My goal is to use multiple sender threads (8) and a certain amount (4) of numbers each to a single receiver via a crossbeam channel and for all of this to happen in parallel to optimize efficiency. This is just a small prototype of a bigger problem that I'm trying to solve. However, the code that I have is causing the program to get stuck endlessly and never exit/timeout. I also know why it's happening - r.iter() blocks until the sender is dropped which only happens outside of the scope. I tried various approaches that didn't work:

  1. Cloning sender withing each thread and then dropping them (as you can see in the comments)

  2. Having the receiver code outside the scope, but that was only letting the final vector contain 20 length instead of the desired 32.

fn main() {
    use crossbeam_channel::{unbounded, bounded};
    use crossbeam_utils::thread::scope;
    use itertools::Itertools;

    let (s, r) = bounded(20);
    let mut v = vec![];

    scope(|scope| {
        scope.spawn(|_| {
            for data in r.iter() {
                v.push(data);
            }
        });
        let _sender_threads = (0..8)
            .into_iter()
            .map(|_| {
                scope.spawn(|_| {
                    // let s_clone = s.clone();
                    for i in 0..4 {
                        // std::thread::sleep(std::time::Duration::from_millis(100));
                        match s.send(i) {
                            Ok(_) => {
                                // println!("sent i {:?}", i);
                                ()
                            },
                            Err(_)=> {
                                // println!("error sending i {:?}", i);
                                ()
                            }
                        };
                    }
                    // drop(s_clone);
                })
            })
            .collect_vec();
    }).expect("scope error.");
    drop(s);
    println!("{:?}", v);
}

1 Answers1

3

This is happening because s isn't dropped until the scope ends, but the scope won't end until all threads have exited, and the thread calling r.iter() won't exit until s is dropped. This is a classic deadlock scenario.

You need to drop s inside the scope, but you can only do that once the sender threads have all exited, so you can't drop(s); in the scope the way this is currently written.

The simplest way around this is to clone s for each sender thread and move the clone into the thread's closure, then drop s in the main scope afterwards.

fn main() {
    use crossbeam_channel::{unbounded, bounded};
    use crossbeam_utils::thread::scope;
    use itertools::Itertools;

    let (s, r) = bounded(20);
    let mut v = vec![];

    scope(|scope| {
        scope.spawn(|_| {
            for data in r.iter() {
                v.push(data);
            }
        });
        let _sender_threads = (0..8)
            .into_iter()
            .map(|_| {
                // ** Clone the sender, move it into the thread:
                let s = s.clone();
                scope.spawn(move |_| {
                    // let s_clone = s.clone();
                    for i in 0..4 {
                        // std::thread::sleep(std::time::Duration::from_millis(100));
                        match s.send(i) {
                            Ok(_) => {
                                // println!("sent i {:?}", i);
                                ()
                            },
                            Err(_)=> {
                                // println!("error sending i {:?}", i);
                                ()
                            }
                        };
                    }
                    // drop(s_clone);
                })
            })
            .collect_vec();

        // ** Drop the remaining sender.
        drop(s);
    }).expect("scope error.");
    println!("{:?}", v);
}

Note the addition of let s = s.clone(); and the change to the following closure by adding move so the closure takes ownership of the clone. Then we move drop(s) into the scope. Now once all sender threads have exited, the sending side of the channel is closed, and the receiver's for loop will terminate.

(Playground)

cdhowie
  • 158,093
  • 24
  • 286
  • 300
  • Thank you! It works for this example. However, since operations inside the scope can be interleaved (because we're spawning threads), the ```drop(s)``` can happen prematurely and that can cause ```v``` to not have all the data, correct? – Sachin Paryani Mar 09 '22 at 17:31
  • 1
    @SachinParyani No. Once all senders are dropped, if there are still sent items in the receiver's queue, they will be produced by the iteration before the iteration finishes. From [the docs](https://docs.rs/crossbeam/latest/crossbeam/channel/struct.Receiver.html#method.iter): _"Each call to next blocks waiting for the next message and then returns it. However, if the channel becomes empty and disconnected, it returns None without blocking."_ Note in particular, "empty and disconnected." – cdhowie Mar 09 '22 at 17:35
  • I think you may have misunderstood my question which is probably because I may have phrased it incorrectly or because of my broken understanding of scoped threads. What I want to know is if ```drop(s)``` can ever be called even before all the senders are done. For example, when a sender is doing some IO instead of sending ```i```, can it preempt the current thread, switch to execution outside of the map and call ```drop(s)```? – Sachin Paryani Mar 09 '22 at 17:43
  • 1
    @SachinParyani Yes, but it does not matter. That only drops the sender owned by `main()`. Recall that we cloned `s` and gave a clone to each thread. Each sender thread owns a sender to send into the same channel. Before the call to `drop(s)` there are therefore _five_ `Sender` values in existence (assuming no sender threads have yet exited). Dropping `s` only removes one `Sender`. _All_ senders need to be dropped to close the sending side of the channel. – cdhowie Mar 09 '22 at 17:48