It is possible to use a thread-local variable to ensure that local_store
is not created more than once in a given thread.
For example, this compiles (full source):
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::cell::RefCell;
thread_local!(static STORE: RefCell<Option<Store>> = RefCell::new(None));
let mut result = Vec::new();
txs.par_iter().map(|tx| {
STORE.with(|cell| {
let mut local_store = cell.borrow_mut();
if local_store.is_none() {
*local_store = Some(store.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}).collect_into(&mut result);
}
There are two problems with this code, however. One, if the clones of store
need to do something when par_iter()
is done, such as flush their buffers, it simply won't happen - their Drop
will only be called when Rayon's worker threads exit, and even that is not guaranteed.
The second, and more serious problem, is that the clones of store
are created exactly once per worker thread. If Rayon caches its thread pool (and I believe it does), this means that an unrelated later call to verify_and_store
will continue working with last known clones of store
, which possibly have nothing to do with the current store.
This can be rectified by complicating the code somewhat:
Store the cloned variables in a Mutex<Option<...>>
instead of Option
, so that they can be accessed by the thread that invoked par_iter()
. This will incur a mutex lock on every access, but the lock will be uncontested and therefore cheap.
Use an Arc
around the mutex in order to collect references to the created store clones in a vector. This vector is used to clean up the stores by resetting them to None
after the iteration has finished.
Wrap the whole call in an unrelated mutex, so that two parallel calls to verify_and_store
don't end up seeing each other's store clones. (This might be avoidable if a new thread pool were created and installed before the iteration.) Hopefully this serialization won't affect the performance of verify_and_store
, since each call will utilize the whole thread pool.
The result is not pretty, but it compiles, uses only safe code, and appears to work:
fn verify_and_store(store: &mut Store, txs: Vec<Tx>) {
use std::sync::{Arc, Mutex};
type SharedStore = Arc<Mutex<Option<Store>>>;
lazy_static! {
static ref STORE_CLONES: Mutex<Vec<SharedStore>> = Mutex::new(Vec::new());
static ref NO_REENTRY: Mutex<()> = Mutex::new(());
}
thread_local!(static STORE: SharedStore = Arc::new(Mutex::new(None)));
let mut result = Vec::new();
let _no_reentry = NO_REENTRY.lock();
txs.par_iter().map({
|tx| {
STORE.with(|arc_mtx| {
let mut local_store = arc_mtx.lock().unwrap();
if local_store.is_none() {
*local_store = Some(store.clone());
STORE_CLONES.lock().unwrap().push(arc_mtx.clone());
}
tx.verify_and_store(local_store.as_mut().unwrap())
})
}
}).collect_into(&mut result);
let mut store_clones = STORE_CLONES.lock().unwrap();
for store in store_clones.drain(..) {
store.lock().unwrap().take();
}
}