5

I've read the dask documentation, blogs and SO, but I'm still not 100% clear on how to do it. My use case:

  • I have about 10GB of reference data. Once loaded they are read-only. Usually we are loading them into Dask/Pandas dataframes
  • I need these ref-data to process (enrich, modify, transform) about 500 mio events a day (multiple files)
  • The "process" is a pipeline of about 40 tasks. Execution sequence is relevant (dependencies).
  • Each individual task is not complicated or time consuming, mostly lookups, enrichments, mappings, etc.
  • There are no dependencies between the events. In theory I could process every event by a separate thread, combine the output into a single file and I'm done. The output events don't even need to be in the same order as the input events.

In summary:

  • we can massively parallalize event processing
  • Every parallel thread needs the same 10 GB of (raw) ref-data
  • Processing a single event means applying a sequence/pipeline of 40 tasks onto them
  • Each individual Task is not time consuming (read ref-data and modify the event)

Possible pitfalls / issues:

  • spend more time on serialization/deserialisation rather then processing the data (we did experience this in some of our trials which used a pipe-like approaches)
  • ref-data are loaded multiple times, once by each (parallel) process
  • preferabbly I would like to dev/test it on my laptop, but I don't have enough memory to load the ref-data. May be if the solution would leverage memory_maps?

The most efficient solution seems to be, if we were able to load the ref-data in memory only once, make it available read-only to multiple other processes processing the events

Scale out to multiple computers by loading the ref-data in each computer. Push filenames to the computers for execution.

Any idea how to achieve this?

Thanks a lot for your help

Juergen
  • 699
  • 7
  • 20
  • See if joblib can do the job : https://joblib.readthedocs.io/en/latest/generated/joblib.Parallel.html . Ability to use shared memory efficiently with worker processes for large numpy-based datastructures. – user238607 Nov 18 '18 at 14:08
  • More details here : https://github.com/joblib/joblib/blob/master/doc/parallel_numpy.rst – user238607 Nov 18 '18 at 14:16
  • The question is about multiple machines. – mdurant Nov 18 '18 at 17:14
  • multiple machine is not the core question. The core question is: share read-only ref-data across multiple processes which process events parallel – Juergen Nov 18 '18 at 20:24

3 Answers3

3

I have also came across the similar issue of running embarissingly parallel jobs that were all fetching data in the same lookup "reference" table (or any big-memory read-only variable needed by each instance of the parallel process. As long as you stay in an environment which follow the "copy-on-write" semantics (e.g. linux), placing the lookup table in the global scope always worked very efficiently as explained nicely here: Shared-memory objects in multiprocessing

Here is a simple parallel workflow:

from multiprocessing import Pool

# Load your reference data, do that only once 
# here in the parent process
my_ref_lookup = load_ref_data(your_data_file)

def your_parallel_function(my_file_path):
    my_new_data = load_data(my_file_path)
    # process my_new_data with some lookup in my_ref_lookup 
    # which is known from the parent process. 

    processed_data = do_stuff(my_new_data)

    # you could here write something on disk
    # and/or return the processed_data

    return processed_data

with Pool(processes = 5) as Pool:
   list_of_result = Pool.map(your_parallel_function, your_list_of_file_paths)

Here the execution of your_parallel_function will execute in parallel over e.g. 5 workers, fetching 5 files inside your_list_of_file_paths at a time and all child processes will have access to my_ref_lookup without having to copy them.

After some time spent with Dask and bag collections, I never found a similar or simpler behavior than this. In my attempts at using Dask, the read-only variable shared this way in the global scope ended up being copied by as many workers which needed it, which exploded the memory and made my kernel crash. I have never seen this case handled in any of the Dask documention. The only remotely related reference to this in the Dask documentation is about avoiding global state: https://docs.dask.org/en/latest/delayed-best-practices.html#avoid-global-state but this shows the case of the shared variable being modified the delayed function, which is different from the current issue of just sharing "read-only" data.

Wall-E
  • 623
  • 5
  • 17
  • Thumbs up. It addresses my issue and it's easy to understand. Also thanks for your insight in Dask. – Juergen Feb 11 '20 at 14:28
  • If you ever come accross a similar easy way in Dask, i'll be happy to hear more about it. – Wall-E Feb 11 '20 at 17:09
1

Some things you can think about

  • each dask worker process can have any number of threads. Sharing data between threads does not require copying, but sharing between processes does; so you should experiment with the process/thread mix to find the optimal for you

  • it is generally better to load data in the workers rather than pass from the client, even though replicating amongst the processes is fairly efficient. If you have the memory to persist the ref-data for every worker, that is obviously best, although Dask tries its best to account for common intermediate dependencies for tasks.

  • every task introduces some overhead, and may result in intermediates being moved from one machine to the other. Although some linear chains of processes may be fused at optimisation time, you are probably better writing a function that calls your stages in sequence from a function, and call that function as a single task, once for each part of your data.

Example

f = client.submit(read_function, ref_filename)
out = client.map(process_function, list_of_inputs, ref=f)

where process_function in this example takes one input (which may be a tuple) and ref= optional input which is the loaded ref data. Dask will replicate the reference data to workers as required.

mdurant
  • 27,272
  • 5
  • 45
  • 74
  • 1
    - we are using SDDs and our problem is CPU bound. Because of GIL, threads are no option - we want to pass the filenames only to the workers. But all workers need the same ref-data. Hence the idea to use share read-only memory thanks for your comments. – Juergen Nov 18 '18 at 20:17
  • You *could* access unix shared memory from each worker function, if you know how to do that, – mdurant Nov 18 '18 at 22:30
  • multiple processes can also share memory, checkout memory map. – Jingpeng Wu May 22 '19 at 14:05
  • Yes, I know, I said so: it is called shared memory (not memory map) http://man7.org/linux/man-pages/man7/shm_overview.7.html – mdurant May 22 '19 at 20:36
0

I've found a blog post about the (python) Ray framework. Even though Ray's business purpose is very different, they faced the same core requirements: read-only shared-memory dataframes leveraged by many parallel processes. They are describing and explaining why they settled on Apache Arrow and pyarrow. Sounds interesting and we'll give it a try for our use case.

Juergen
  • 699
  • 7
  • 20
  • 1
    Just short sidenote on ray, if of interest: It requires all code to be fed into the processing from _inside_ a controller node. There is (afaik) no external control option, just in case you might need ray as a remote processing system. Also, it looks like a one-shot solution, clusters are hardly manageable across multiple runs, because ray does not clean its redis storage. – jbndlr May 22 '19 at 20:47