5

I'm iterating over several gigabytes of input items from a database. On each input item, I'm doing some CPU-intensive processing which produces one or more new output items, tens of gigabytes in total. The output items are then stored in another database table.

I have gotten a nice speedup by using Rayon for parallel processing. However, the database API is not thread-safe; it's Send but not Sync, so the I/O must be serialized.

Ideally, I would just want to write:

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .ser_bridge() // End parallelism. This function does not exist.
    .for_each(|output_item| {
        output_database.write_item(output_item);
    });

Basically I want the opposite of par_bridge(); something that runs on the thread where it's called, reads items from each thread, and produces them serially. But in the current implementation of Rayon, this doesn't seem to exist. I'm not sure whether this is because it's theoretically impossible, or whether it doesn't fit into the current design of the library.

The output is too big to collect it all into a Vec first; it needs to be streamed into the database directly.

By the way, I'm not married to Rayon; if there's another crate that is more suitable, I'm happy to make the switch.

Thomas
  • 174,939
  • 50
  • 355
  • 478

2 Answers2

3

You can wrap your output database in an Arc<Mutex> to prevent parallel accesses:

let output_database = Arc::new (Mutex::new (output_database));
input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each_with (output_database, |output_database, output_item| {
        output_database.lock().write_item(output_item);
    });
Jmb
  • 18,893
  • 2
  • 28
  • 55
1

I assume the order doesn't matter, therefore you don't need to have an order of your output data.

You could use a mpsc::channel to transfer your data from the for_each closure to your database api, e.g.

use std::sync::mpsc;

let (tx, rx) = mpsc::channel();

input_database
    .read_items()
    .par_bridge() // Start parallelism.
    .flat_map_iter(|input_item| {
        // produce an Iterator<Item = OutputItem>
    })
    .for_each(move |output_item| {
        tx.send(output_item).unwrap();
    });

and in a second thread you can use the rx variable to receive the data and write it to the database.

hellow
  • 12,430
  • 7
  • 56
  • 79
  • 1
    True, order doesn't matter in my case. I ended up with something similar except using a `crossbeam::channel`. – Thomas Mar 10 '21 at 09:21
  • crossbeam has better performance, indeed. I used std for simplicity reasons. – hellow Mar 10 '21 at 09:38
  • @Thomas You are probably aware of that, but just in case: if you use channels, consider using a bounded channel to provide backpressure and avoid a memory leak when the channel consumer is slower than the producer. – user4815162342 Mar 10 '21 at 10:31