0

I am trying to split a large file into smaller pieces. I will read all the data from the large file first and then use multiprocessing to write them to different smaller data file.

Here is method one, where is use multiprocessing.Process to initialize a process, which works well

def split_data_with_process(filepath, chunk_num):
    def write(write_data, index, name, prev_path, suffix):
        print("enter")
        start_time = time.time()
        with open(prev_path + f"/{name}_{index}.{suffix}", "w", encoding="utf-8") as f:
            f.writelines(write_data)
        print(time.time()-start_time)

    prev_path, filename = filepath.rsplit("/", maxsplit=1)
    name, suffix = filename.split(".")
    with open(filepath, "r", encoding="utf-8") as f:
        totalList = f.readlines()
    chunksize = math.ceil(len(totalList) / chunk_num)
    data = [(totalList[start:start + chunksize], index) for index, start in
            enumerate(range(0, len(totalList), chunksize))]
    tasks = []
    start_time = time.time()
    for each in data:
        task = multiprocessing.Process(target=write, args=(each[0], each[1], name, prev_path, suffix))
        task.start()
        tasks.append(task)
    for each in tasks:
        each.join()
    end_time = time.time()
    print(end_time - start_time)

and the output is

enter
enter
enter
enter
enter
7.192562818527222
8.827389001846313
9.067991018295288
9.476916313171387
7.729929208755493
15.109729290008545

then I try to rewrite the code with ProcessPoolExecutor, and the code is like

def write(input):
    list, index, prev_path, name, suffix = input
    print("enter")
    start_time = time.time()
    with open(prev_path + f"/{name}_{index}.{suffix}", "w", encoding="utf-8") as f:
        f.writelines(list)
    print(time.time() - start_time)
    return len(list)


def split_data_with_process_2(filepath, chunk_num):
    prev_path, filename = filepath.rsplit("/", maxsplit=1)
    name, suffix = filename.split(".")
    with open(filepath, "r", encoding="utf-8") as f:
        totalList = f.readlines()
    chunksize = math.ceil(len(totalList) / chunk_num)
    data = [(totalList[start:start + chunksize], index, prev_path, name, suffix) for index, start in
            enumerate(range(0, len(totalList), chunksize))]
    start_time = time.time()
    with ProcessPoolExecutor(max_workers=chunk_num) as pool:
        result = pool.map(write, data)
    print(sum(result))
    end_time = time.time()
    print(end_time - start_time)

In second way it will take a much longer time then the first way. I find that it looks like that different processes are working serially instead of working parallel. and the output is

enter
3.416102170944214
enter
3.3221476078033447
enter
3.198657989501953
enter
3.453885316848755
enter
3.261833429336548
16149274
42.55615472793579

So what is the problem here?

petezurich
  • 9,280
  • 9
  • 43
  • 57
pythonHua
  • 61
  • 6
  • I'm not sure, but i thing it is because ProcessPoolExecutor set the number of processes dynamically. How many CPU's do you have on your System? – tturbo Oct 13 '22 at 07:11
  • Also, ProcessPoolExecutor has a lot of overhead and you do mainly I/O bounded operations, which do not have a strong need for multiprocessing. Maybe you wold achive wasy better performance with [async file i/o](https://www.twilio.com/blog/working-with-files-asynchronously-in-python-using-aiofiles-and-asyncio)? – tturbo Oct 13 '22 at 07:12
  • 1
    It's not clear that even multithreading or asyncio would improve performance. You would still be writing multiple files concurrently and your drive, unless it was solid state, might be doing more head movement back and forth among the files you are writing as a result. – Booboo Oct 13 '22 at 11:04
  • yes, I have also doubted that multiprocessing is useless because it is mainly I/O bounded, but my test shows that it indeed reduce the time cost with multiprocessing compared to doing everything just in the single process. I also tried the multithreading version and it cost more time than single thread method. @Booboo – pythonHua Oct 17 '22 at 06:36
  • I am running the code on linux with [GCC 10.3.0] Python 3.9.12. The number of CPU is 44 according to the top command and I set the max_workers=5 in my code above. and what I feel strange about is that it looks like different processes are working serially instead of working parallel when I use the ProcessPoolExecutor @tturbo – pythonHua Oct 17 '22 at 06:42

1 Answers1

0

Updated answer:

ProcessPoolExecutor has far more overhead than when using Process alone given ProcessPoolExecutor employs Futures and overall offers more features such as the ability to cancel, check status, obtain the result, etc.

Related to this, Process does not pickle the return value. While that is not the majority of the time increase you are seeing, it is not insignificant. If you, for example, employ a queue to return a value, you will see overhead added to usage of Process.

So overall, Process is leaner and more directly an OS construct in many ways but does not have features of Futures which themselves require overhead to manage them.

The following other Stack Overflow question/answer is related.

Stack Overflow #18671528

The recommendation in that other Stack Overflow answer makes sense, to use ProcessPoolExecutor when the benefits of Futures are desired, perhaps for longer running tasks than yours. Note, that other article is submitting many more tasks than yours and is hitting a much larger/significant perf impact (it has 1000s and 1000s of outstanding Futures).

Some additional detail which you might find interesting

Some (not all) of the overhead can be inferred from the the design specified at the top of the CPython Lib\concurrent\futures\process.py file...

ProcessPoolExecutor design comments

It shows that your submitting thread (thread calling map) enqueues a Work Item which is dequeued by an internal worker thread which itself enqueues an item to the "Call Q"... all of that will add latency not observed by Process.

Along those lines, if you look at the details of Future...

class Future

...you will see use of condition variables and quite a bit of code around managing them.

To outline specifically where in CPython the overhead occurs would require a more detailed performance analysis which I'll omit. Offhand, given the internal thread, and condition variables that the parent process must navigate, I would not be surprised if there were large parent-process GIL-related impact... but I prefer a detailed analysis to say for certain...

Generally, you pay more cost in time for features gained with ProcessPoolExecutor. Which approach you choose depends on your needs, how long the subprocesses will execute, whether you need results, features of Futures, and other considerations.

Original answer:

Addendum: The original suggestion was to remove the possibility of using too many workers by omitting chunk_num from the call to ProcessPoolExecutor(). @pythonhua (OP) has confirmed OS is Linux, number of chunks is only 5 so too many workers is not the issue. Also, typo fixed... chunksizes was incorrect used in commentary instead of chunk_num.


Your question is incomplete in that it does not indicate file sizes, line sizes, and chunk sizes, etc.

I assume you are on Linux with large chunk sizes. I suspect your are unnecessarily saturating ProcessPoolExecutor with more workers than makes sense.

There is no direct correlation between chunk_num and the number of workers you should select. You must take into account your hardware or let ProcessPoolExecutor make the choices for you.

So I see a potential issue with your use of ProcessPoolExecutor. If you have numerous chunks, you will end up with too many workers which will unnecessarily slow things down.

Try removing your max_workers initializer (i.e., use None and let ProcessPoolExecutor choose the defaults for you), or try changing it to something around the number of logical processors you have on your system or less.

i.e., change this...

ProcessPoolExecutor(max_workers=chunk_num)

...to something like one of the following...

ProcessPoolExecutor()

ProcessPoolExecutor(max_workers=<num_logical_processors_or_less>)

In one of my projects, I use the following to limit max_workers. While this is based on my project's needs but you can apply logic that works for your project while also keeping in mind the hardware. In my case, I wanted no more than 15 workers regardless of cpu_count()/2...

min(os.cpu_count() // 2, 15)

Note, I assume you are using Linux because Windows does not have fork and its implementation of Process currently caps max workers at 61. With excessive max_workers, I do not see the perf hit on Windows, but certainly do on Linux. So I bet you are specifying too many workers based on an incorrect assumption there's a direct correlation between the count of work items you have without regards to processors or hardware which would be incorrect. ProcessPoolExecutor is there to manage a pool of available sub-processes for you, which is very often less processes than the total count of work you have to perform over time.

If this is not helpful, provide more info to clarify the undefined characteristics of the problem.

Ashley
  • 575
  • 5
  • 12
  • My code is running on [GCC 10.3.0] on linux with python version Python 3.9.12 I set the max_works = chunk_num instead of chunksize, maybe you misread some code? and the chunksize is used for splitting the origin data to smaller pieces and I set the number of max_words to chunk_num. My goal is to split the origin data file into chunk_num smaller data file and I set the number of max_works to chunk_num (which is 5 in my example), and I believe there is no sense to make the max_works larger. – pythonHua Oct 17 '22 at 06:23