2

I have a workstation with 72 cores (actually 36 multithreaded CPUs, showing as 72 cores by multiprocessing.cpu_count()).

I tried both multiprocessing and ray for a concurrent processing, in batches of millions of small files, and I would like to write some output files concurently during that processing.

I am confused with the blocking of the .get() methods associated with e.g. apply_async() (in multiprocessing) and ray.get().

With ray, I have a remote function (process_group()) that processes groups of data in parallel within a loop. In what follows, the version of the code using the multiprocessing module is also given as comments.

import ray
import pandas as pd
# from multiprocessing import Pool

ray.init(num_cpus=60)
# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
   ##-----------------------
   ## With ray :
   df_list = ray.get([process_group.remote(data) for data in data_list])
   ##-----------------------
   ## With multiprocessing :
   #f_list = pool.map(process_group, list_of_indices_into_data_list)
   ##
   ##      data are both known from the parent process
   ##      and I use copy-on-write semantic to avoid having 60 copies.
   ##      All the function needs are a list of indices
   ##      of where to fetch slices of the read-only data.  
   ##
   very_big_df = pd.concatenate(df_list)
   ##-----------------------
   ## Write to file :
   very_big_df.to_parquet(outputfile)

So in each loop iteration, I have to collect the output of the many process_group(), which were computed concurrently, as a list of dataframes df_list for concatenation into one bigger very_big_df dataframe. The latter needs to be written to disk (typically the sizes are ~1 to ~3 GB). Writing one such file takes about 10-30 [s] while it takes about 180 [s] for the process_group remotes to get processed. There are thousands of loop iterations. So this will take several days to complete.

Is it possible to have the file written to disk in a non-blocking manner, while the loop continues in order to save about 10% of the time (that would save about one day of computation)?

By the time the concurrent processes of the next loop iteration finishes, there is enough time for the output from the previous iteration to be written. The cores involved here appear to all run at near 100% so the Threading module is probably not recommended either. multiprocessing.apply_async() is even more frustrating as it does not want my non-pickable output very_big_df dataframe which I would have to share with some more sophistication that may cost me the time that I am trying to save and I was hoping ray would handle something like that efficiently.

[UPDATE] For sake of simplicity, I did not mention that there is a big shared variable among all the processes (Which is why i had called it a parallel process, as well as concurrent writing of the file). My title question was edited as a result. So actually, there's this bit of code before the ray parallel jobs:

shared_array_id = ray.put(shared_array)
df_list = ray.get([process_group.remote(shared_array, data) for data in data_list])

Not sure though whether that makes it more like a "parallel" execution and not just concurrent operations.

[UPDATE 2] The shared array is a lookup table, i.e. read-only as far as the parallel workers are concerned.

[UPDATE 3] I tried both proposed solutions: Threading and Ray / compute() For the latter, it was suggested to use the writing function as a remote and send the writing operation asynchronously within the for loop, which I originally thought was only possible through .get() which would be blockin.

So with Ray, this shows both solutions:

@ray.remote
def write_to_parquet(df_list, filename):
    df = pd.concat(df_list)
    df.to_parquet(filename, engine='pyarrow', compression=None)

# Share array created outside the loop, read-only (big lookup table). 
# About 600 MB
shared_array_id = ray.put(shared_array)

for data_list in many_data_lists:

   new_df_list = ray.get([process_group.remote(shared_array_id, data) for data in data_list])
   write_to_parquet.remote(df_list, my_filename)

   ## Using threading, one would remove the ray decorator:
   # write_thread = threading.Thread(target=write_to_parquet, args=(new_df_list, tinterval.left))
   # write_thread.start()

For the RAY solution, this required however to increase the object_store_memory, the default was not enough: 10% of node memory ~ 37 GB (I have 376 GB of ram) which caps then at 20GB and the only objects stored total about 22 GB: one list of dataframes df_list (about 11 GB), and the result of their concatenation inside the writing function (about 11 GB then), assuming there is a copy during concatenation. If not, then this memory issue does not make sense and I wonder if I could pass numpy views, which I thought was happening by default. This is rather frustrating aspect of RAY as I cannot really predict how much memory each df_list is going to be, it can vary from 1x to 3x...

In the end, sticking to multiprocessing with Threading hapens to be the most efficient solution, as the processing part (without I/O) is faster:

from multiprocessing import Pool

# Create the shared array in the parent process & exploit copy-on-write (fork) semantics
shared_array = create_lookup_table(my_inputs)

def process_group(my_data):
   # Process a new dataframe here using my_data and some other data inside shared_array
   ...
   return my_df


n_workers = 60
with Pool(processes=n_workers) as pool:
   for data_list in many_data_lists:
      # data_list contains thousands of elements. I choose a chunksize of 10
      df_list = pool.map(process_group, data_list, 10)
      write_thread = threading.Thread(target=write_to_parquet, args=(group_df_list, tinterval.left))
            write_thread.start()

At each loop iteration, typically len(many_data_lists) = 7000 and each list contains 7 numpy arrays of size(3, 9092). So these 7000 lists are sent across the 60 workers:

time for all parallel process_group per loop iteration:

RAY: 250 [s]

Multiprocessing: 233 [s]

I/O: It takes about 35s for a 5GB parquet file to be written on an external USB 3 spinning disk. About 10s on internal spinning disk.

Ray: ~5 s overhead for creating the future with write_to_parquet.remote() which blocks the loop. That is still 50% the time it would take to write on spinning disk. This is not ideal.

multiprocessing: 0 s overhead measured.

total wall times:

Ray: 486 [s]

Multiprocessing: 436 [s]

I iterated this a few times, differences between Ray and Multiprocessing are consistently showing Multiprocessing faster by ~50s. This is a significant difference, also puzzling as Ray advertises higher efficiency.

I will run this for a longer number of iterations and report back on stability (memory, potential issues of gargage collection, ...)

Wall-E
  • 623
  • 5
  • 17
  • 3
    Writing to a file is an I/O-bound operation, and should use very little CPU time, so writing the file in a background thread seems like it could work for you here. It also releases the GIL, so it won't interfere with your other processing at the Python-level, either. – dano Feb 13 '20 at 03:55
  • So do you suggest to mix my ray workflow with the use of Threading? – Wall-E Feb 13 '20 at 15:56
  • **A ) the `pickle` issue** : better try Mike McKearn's **substitute** a `dill`, best as simply replaced as **`import dill as pickle`** ( it could `pickle.dump` a lot of structures the standard module cannot ( even **can statefully { save | load } the whole python sessions** which is great for work with complex models and have replicable progressive snapshots and many other life-saving tricks et al ) **B ) the PERFORMANCE** : would you mind to update your post with a copy of the actual computing device, reported by `lstopo-no-graphics -.ascii` ? – user3666197 Feb 13 '20 at 16:12
  • @user3666197 the result of that commande is HUGE in my terminal. Not sure how you want to update my post with what info. It doesn't even fit in a screenshot. (the list is long, there are 72 cores after all to describre...) – Wall-E Feb 13 '20 at 16:19
  • Sure, as expected, copy/paste works fine, scrolling here either, + can make a pdf or other formats, if feeling a need. Ref.: `man lstopo` **BTW** a nice coronal ejecta video. The remark about shared dataframe is extremely important, not for a wish to have an interleaved computing-block with a call to a `.to_parquet()` method I/O-block ( a common trick of latency masking ), but **due to the costs of** process-to-process **sharing** you have to pay without any obvious reason for doing that ( what benefit do your processing get from the must to keep the shared variable communicated, synced etc ?) – user3666197 Feb 13 '20 at 16:26
  • @user3666197 I got reminded that I am not authorized to share the details of this hardware. really sorry... – Wall-E Feb 13 '20 at 16:30
  • @user3666197 wasn't clear if your question was rhetorical but i kept the big dataframe shared this way because it would explode the memory to have it copied by as many workers. That shared variable can be up to 5GB. – Wall-E Feb 13 '20 at 16:35
  • Never mind, confidentiality is fully understandable. Full respect to your supervisors for their service, indeed. Anyway, now you see the whole landscape. Next, not all CPU-cores might be permitted for python process to get used ( permissions and affinity mappings ). Next, not all such CPU-cores may get served to you on an exclusive basis ( may see details about the actual share of the so called **CPU work-stealing cycles**, if this device is but an abstract projection of a cloudy-dream, one may easily get less than 60% of the actual CPU-cycles, the rest is spent not on your processing... ). – user3666197 Feb 13 '20 at 16:45
  • Next, ***the sin of sharing***. Given you aim at HPC-grade computing, always do balance the costs against benefits (if any). Having occupied 300 GB of RAM may get way faster, than making all 60-workers stay in a queue for a signalled access, wait-till-confimed and SER/DES-re-processing the "shared"-constant, the worse a shared-variable. **This is the core bottleneck**, not the file-I/O ( that may and shall get easily latency-masked ). If dependency-tree(s) permit, avoid sharing, the more in python ( where the add-on costs of a GIL-lock dancing is expensive in any-hundreds-of-CPU-core devices ) – user3666197 Feb 13 '20 at 16:49
  • @user3666197 Thank you for your understanding. i can only tell you that this is my "own" workstation. It is not cloud-based, cluster or alike. It's just a beefy one. 72 cores. Ubuntu OS. When I run the ray jobs with 60 cores setup, I see all 60 cores lighting up at 100% in the system monitor. Let me see what happens if i don't share the variable and have it copied, i'll chose one that still fits in my my memory... That doesn't address the main issue though: writing the file on disk concurrently. Let me try the threading option. Then i'll try with pathos.multithreading / dill project. – Wall-E Feb 13 '20 at 16:53
  • @Wall-E Yes, it seems like you could move the file write to a Thread (you could even use a thread pool of size 1 if you want to ensure only one write happens at a time), to allow the loop to keep executing in parallel to the write. – dano Feb 13 '20 at 16:57
  • @user3666197 let me fix also my wording. The shared "variable" that is up to 5GB, it is read-only, it's just a big lookup table (dataframe, or numpy array). I guess that's what you meant by "shared-constant". – Wall-E Feb 13 '20 at 16:59
  • `Threading` module will kill your computing phase, as all 60-workers will not work but in a GIL-lock re-[SERIAL]-ised ( in a purely SEQ-uential fashion (!), indeed a bad idea if speaking about performance ). The `file-I/O` concurrency ( latency-masking ) has not to rely on python GIL-lock but the very opposite. Better "spit" the few GB each time over IPC or even across a local network to another, independent process, responsible for `file-I/O`, than blocking the computing-path on the HPC-grade dependency-graph. – user3666197 Feb 13 '20 at 17:00
  • @user3666197 The thread would only be used to write the file, which releases the GIL. – dano Feb 13 '20 at 17:00
  • @dano negative, Sir. It will first spend ages on re-formatting row-data into columnar data, do a compression and awfully lot of CPU-related processing, before any Hadoop Parquet file gets first I/O-touched. Not a good idea for python GIL-locked computing. – user3666197 Feb 13 '20 at 17:02
  • @Wall-E you ought read the python process-based concurrency also in terms of CPU-cache-hierarchy efficiency. Given each of the 60-workers ( settled on one's own CPU, feeding the core-local L1/L2/L3-caches ( ref. your `lstopo` map details ) ), the performance gets improved by re-reading data from cache **at a cost of a few `[ns]`**, not from RAM ~ small hundreds of `[ns]`, the less "across" a re-requested Sharing-mediated remote-data request + SER/DES at a repetitive costs of many, GIL-lock dependent (!) `[ms]` - indeed an awfully bad idea for fast and smart data-processing pipeline, isn't it? – user3666197 Feb 13 '20 at 17:10
  • 1
    @user3666197 Ah, you mean that this call: `very_big_df.to_parquet(outputfile)` is doing a bunch of pre-processing before it actually gets to the write? Then I agree that means the GIL will be held for significant amounts of time in the background thread. I'm not clear on how that interferes with the `process_group` work though, since that is running in separate processes that don't share the GIL. It seems sending the data to pre-process/write over the network to another system is the only way to avoid having the `to_parquet` call competing with the computation work for CPU cycles. – dano Feb 13 '20 at 17:22
  • @Wall-E to get some sense of the actual costs all your CPUs pay, no matter what form & shape of the **data-hydraulics** flow selected (having different tasks to complete,if ill-designed (i.e. sharing & that added inter-process signalling) ) review the CPU-core related **mandatory expenses** for a best-case scenario (i.e. an exclusive,monopolistic) operations -- https://stackoverflow.com/a/59750841/ + add the many multiples thereof, in case GIL-locks dances step in & govern when + who gets its turn( + **all others will wait** & 've to ask again,after some time,if they might get a piece of data) – user3666197 Feb 13 '20 at 17:59
  • @user3666197 i first tried not using the shared_array and instead send each worker its own copy. That barked with ```ray.exceptions.ObjectStoreFullError: Failed to put object (blah) in object store because it is full. Object size is 839902 bytes.``` I am soon sending a comparison of solutions including the Threading options that @dano suggested, which does what I wanted. – Wall-E Feb 13 '20 at 19:08
  • I have updated my post to compare all solutions discussed. – Wall-E Feb 13 '20 at 22:56
  • @dano A new issue came up when running the multiprocessing + Threading on the full size arrays. Memory keeps increasing as the loop iterates as if the dataframes created in each loop is not freed when the next begins and once it has been written to disk by the writing thread. It should not, it should plateau. Any chance the memory is not freed properly in this kind of implementation? – Wall-E Feb 14 '20 at 14:38
  • @Wall-E I guess it could be because you're creating a new Thread object on every iteration, and then never joining them. You could try using `multiprocessing.dummy.Pool` (which is a `ThreadPool`), or `concurrent.futures.ThreadPoolExecutor`, so that you have a fixed number of threads to do the writes, and see if that helps. – dano Feb 14 '20 at 14:53
  • Alternatively, would it make sense to join them after the next iteration, and right before I start the next writing thread? – Wall-E Feb 14 '20 at 16:03

1 Answers1

2

Have you considered to assign 1 core to a ray task that writes data into a file?

[UPDATE] Prototype

import ray
import pandas as pd
# from multiprocessing import Pool

ray.init(num_cpus=60)

@ray.remote
def write_to_parquet(data, filename):
    # write it until succeed.
    # record failed write somewhere. 
    # I assume failure to write is uncommon. You can probably just 
    # write ray.put() and have one background process that keeps failed 
    # write again.

# with Pool(processes=n_workers) as pool:
for data_list in many_data_lists:
   ##-----------------------
   ## With ray :
   df_list = ray.get([process_group.remote(data) for data in data_list])
   ##-----------------------
   ## With multiprocessing :
   #f_list = pool.map(process_group, list_of_indices_into_data_list)
   ##
   ##      data are both known from the parent process
   ##      and I use copy-on-write semantic to avoid having 60 copies.
   ##      All the function needs are a list of indices
   ##      of where to fetch slices of the read-only data.  
   ##
   very_big_df = pd.concatenate(df_list)
   ##-----------------------
   ## Write to file :

   write_to_parquet.remote(very_big_df, filename)
Sang
  • 885
  • 5
  • 4
  • Sure, but the documentation says it's a blocking operation each time you call get() and so i'm confused, that seems to defeat the purpose. – Wall-E Feb 13 '20 at 15:55
  • Not sure if I understood you correctly, but you don't need to call ray.get() to make remote task works. Once you call .remote(), it is asynchronously scheduled and running in the cluster. `ray.get()` is used to get the object from object store (typically a return object) of remote tasks. If what you meant was that you want to make sure the write succeeds using `ray.get()`, you can have a background thread that periodically checks the result of your task. and reschedule if it fails. To achieve this, you can have a while loop and use `ray.wait()`. – Sang Feb 13 '20 at 18:32
  • I thought the .remote() was just creating a future, returning some task ID that we're meant to compute later with .get(). Let me try what you say without calling get and see if the file gets written even if i don't call .get() on this task ID. – Wall-E Feb 13 '20 at 19:13
  • So not calling get() and using a file writing function e.g. as ```write_to_parquet.remote(data, filename)``` did create the file. But it came with a 10% overhead (with respect to file write time) that blocked the loop for 1-2s. Not bad. But the threading option discussed above has undetectable overhead (order of ns). Not sure what is cleaner though. Thoughts? – Wall-E Feb 13 '20 at 19:33
  • After running this on the full size arrays, this actually failed. The object sent to the remote writing function was too big. So this does not look like an ideal solution. – Wall-E Feb 13 '20 at 19:56
  • "But it came with a 10% overhead (with respect to file write time) that blocked the loop for 1-2s" -> Does the loop mean your for loop for `many_data_lists`? Since the remote function just returns the future, it theoretically should not block the loop. I was thinking this sort of prototype code. – Sang Feb 13 '20 at 19:58
  • 1
    You can increase the object memory size by modifying `object_store_memory` `ray.init(object_store_memory=[bigger memory than the max size of your data frame])` https://ray.readthedocs.io/en/latest/package-ref.html Ray supports garbage collection for objects that are not used (not fully yet). Note that if your code is not in the scope of Ray garbage collection, you can still have OOM issues. Check here for more details. https://stackoverflow.com/questions/60175137/out-of-memory-with-ray-python-framework – Sang Feb 13 '20 at 20:03
  • Ok, that worked. But I have tried on the full size arrays. The overhead of creating the remote() to write asynchronously is about 5s which blocks the for loop for ```many_data_lists```. That's a lot. It takes about 10s to write the file on internal disk. That's 50% overhead and kind of defeat the purpose. I guess that's because what I send in there is rather big (~10 GB)? Threading does not have this issue. – Wall-E Feb 13 '20 at 22:54
  • Okay. I guess it takes lots of time to pass large size data using Ray so having a remote function for writing a huge file is not a good idea. Also note that, you can do threading approach with Ray as well. – Sang Feb 14 '20 at 00:36
  • Also just one last thing to mention is that Ray is not a competitor of Multiprocessing library. Multiprocessing library works only in a single machine, but Ray is for distributed computing with many machines (which means it is highly scalable). For example, Ray can have a library like this https://ray.readthedocs.io/en/latest/multiprocessing.html, which provides exactly same API as multi processing library, but scale from a single machine to a cluster seamlessly. – Sang Feb 14 '20 at 00:40
  • I didn't consider Ray as a "competitor", that's not the point of this post. But as a scientist, when i face a problem, i have to compare the different methods that get me to the same results. But multiprocessing and Ray where the 2 solutions that worked best. I didn't put Dask in there put i spent two weeks doing this with Dask, but that wasn't great for this one. Thanks to these "comparisons", i'm more able to choose which one i need for future problems. I'll definitely use Ray again. – Wall-E Feb 14 '20 at 00:48
  • There seem to be memory issues with the use of the Multiprocessing + Threading method, came up on my production run. The memory does not seem to be freed after the files are written, either the list of small dataframes, or the big dataframe, or both, aren't freed. I will try Ray on this production run as you say it supports garbage collection and see if it handles the memory better. – Wall-E Feb 14 '20 at 14:43
  • 1
    Sounds good. It you want better support or information, I recommend you to join Ray public slack. You can find the Invitation link at the bottom of README.md of ray repo. https://github.com/ray-project/ray/blob/master/README.rst – Sang Feb 14 '20 at 16:33