16

I am trying to optimize my function using Rayon's par_iter().

The single threaded version is something like:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.iter().map(|tx| {

         tx.verify_and_store(store)

    }).collect();

    ...
}

Each Store instance must be used only by one thread, but multiple instances of Store can be used concurrently, so I can make this multithreaded by clone-ing store:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {

    let result = txs.par_iter().map(|tx| {

         let mut local_store = store.clone();

         tx.verify_and_store(&mut local_store)

    }).collect();

    ...
}

However, this clones the store on every iteration, which is way too slow. I would like to use one store instance per thread.

Is this possible with Rayon? Or should I resort to manual threading and a work-queue?

Matthieu M.
  • 287,565
  • 48
  • 449
  • 722
Tomas
  • 5,067
  • 1
  • 35
  • 39

2 Answers2

11

Old question, but I feel the answer needs revisiting. In general, there are two methods:

Use map_with. This will clone every time a thread steals a work item from another thread. This will possibly clone more stores than there are threads, but it should be fairly low. If the clones are too expensive, you can increase the size rayon will split workloads with with_min_len.

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    let result = txs.iter().map_with(|| store.clone(), |store, tx| {
         tx.verify_and_store(store)
    }).collect();
    ...
}

Or use the scoped ThreadLocal from the thread_local crate. This will ensure that you only use as many objects as there are threads, and that they are destroyed once the ThreadLocal object goes out of scope.

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    let tl = ThreadLocal::new();
    let result = txs.iter().map(|tx| {
         let store = tl.get_or(|| Box::new(RefCell::new(store.clone)));
         tx.verify_and_store(store.get_mut());
    }).collect();
    ...
}
Mike Pedersen
  • 1,056
  • 10
  • 18
  • Nice answer, I've seen it only now. What's the reason for boxing the `RefCell` returned by the closure passed to `ThreadLocal::get_or()`? – user4815162342 Jul 30 '21 at 14:25
9

It is possible to use a thread-local variable to ensure that local_store is not created more than once in a given thread.

For example, this compiles (full source):

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::cell::RefCell;
    thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));

    let mut result = Vec::new();

    txs.par_iter().map(|tx| {
        STORE.with(|cell| {
            let mut local_store = cell.borrow_mut();
            if local_store.is_none() {
                *local_store = Some(store.clone());
            }
            tx.verify_and_store(local_store.as_mut().unwrap())
        })
    }).collect_into(&mut result);
}

There are two problems with this code, however. One, if the clones of store need to do something when par_iter() is done, such as flush their buffers, it simply won't happen - their Drop will only be called when Rayon's worker threads exit, and even that is not guaranteed.

The second, and more serious problem, is that the clones of store are created exactly once per worker thread. If Rayon caches its thread pool (and I believe it does), this means that an unrelated later call to verify_and_store will continue working with last known clones of store, which possibly have nothing to do with the current store.

This can be rectified by complicating the code somewhat:

  • Store the cloned variables in a Mutex<Option<...>> instead of Option, so that they can be accessed by the thread that invoked par_iter(). This will incur a mutex lock on every access, but the lock will be uncontested and therefore cheap.

  • Use an Arc around the mutex in order to collect references to the created store clones in a vector. This vector is used to clean up the stores by resetting them to None after the iteration has finished.

  • Wrap the whole call in an unrelated mutex, so that two parallel calls to verify_and_store don't end up seeing each other's store clones. (This might be avoidable if a new thread pool were created and installed before the iteration.) Hopefully this serialization won't affect the performance of verify_and_store, since each call will utilize the whole thread pool.

The result is not pretty, but it compiles, uses only safe code, and appears to work:

fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
    use std::sync::{Arc, Mutex};
    type SharedStore = Arc<Mutex<Option<Store>>>;

    lazy_static! {
        static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
        static ref NO_REENTRY: Mutex<()> = Mutex::new(());
    }
    thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));

    let mut result = Vec::new();
    let _no_reentry = NO_REENTRY.lock();

    txs.par_iter().map({
        |tx| {
            STORE.with(|arc_mtx| {
                let mut local_store = arc_mtx.lock().unwrap();
                if local_store.is_none() {
                    *local_store = Some(store.clone());
                    STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
                }
                tx.verify_and_store(local_store.as_mut().unwrap())
            })
        }
    }).collect_into(&mut result);

    let mut store_clones = STORE_CLONES.lock().unwrap();
    for store in store_clones.drain(..) {
        store.lock().unwrap().take();
    }
}
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • 1
    It's a shame there doesn't seem to be anything scoped to this call (though this is clearly useful in a decent subset of cases). – Chris Emerson Mar 08 '17 at 09:50
  • @ChrisEmerson Yes, what worries me about this answer is that I can't think of a way to clean up the created stores (or run other arbitrary commands when everything is done, such as flush them to disk) using safe code. Worse, the next call to `verify_and_store` will continue working with **last** known `Store` clones, which possibly have nothing to do with the current `store`. – user4815162342 Mar 08 '17 at 12:20
  • Thanks. This works but in my particular case I've found that Rayon has `par_chunks` to reduce the number of clones. Although this might still result in multiple clones per thread, it doesn't have the scope problem @user4815162342 is describing. – Tomas Mar 08 '17 at 13:05
  • @ChrisEmerson I've now updated the answer to add scoping and proper cleanup; the code is not elegant (to say the least), but it appears to work! – user4815162342 Mar 08 '17 at 21:01
  • @Tomas You might want to add your solution as an answer, and accept it. My answer is a fun exploration of Rust/Rayon's support for thread-local code, while your `par_chunks` solution seems to do a good and elegant job of nailing the actual problem. – user4815162342 Mar 08 '17 at 21:30