1

I'm developing a script where data needs to be analyzed, that is already available in an hdf5 file. This data can range from 10gb to a terabyte.
I've created a script in Python, but the entire flow is quite slow.

The flow:
I am reading from this hdf5 file, that has a 3D dataset with a shape such as: (100, 70001, 30000). Then I make calculations, here represented as magic_calculation. Finally I create a new hdf5 file to write all the results, it becomes a file with a shape (2, 70000, 30000).

My current architecture goes as follow:
The main process creates 2 threads, one to read and one to write in the files.
Then X processes from multiprocessing are created and also X queues. The reading thread populates X queues.
The processes are getting data from the X queues and doing the calculations. Then it also put the result data in an out queue.
The write thread gets the out queue and writes in the file.
The X queues were created since each process needs to have calculations history for each z (x, y, z).

The problem I'm facing: it's quite slow.

A code sample:

import utils
from maths import magic_calculation
import multiprocessing
import threading
import copy
import os


def read_thread(read_file, the_set, y, queue_dict):
    queue_len = len(queue_dict)
    with utils.read_hdf5(read_file) as file:
        ds = file[the_set]
        for z in range(30000):
            for t in y[0:len(y) - 1]:
                for queue_count, queue in enumerate(queue_dict.values()):
                    data1 = ds[:, t, z*queue_len+queue_count]
                    data2 = ds[:, t + 1, z*queue_len+queue_count]
                    queue.put([data1, data2, z*queue_len+queue_count])
        queue.put('STOP')

def write_thread(ds, queue):
    with utils.write_hdf5(write_file) as file:
        while True:
            result = queue.get()
            if result == 'STOP':
                break
            z = result[2]
            file[ds][0, :, z] = result[0]
            file[ds][1, :, z] = result[1]
    print("DONE!")


def the_process(in_queue, out_queue):
    result1 = list()
    result2 = list()
    d = 0
    init = False
    while True:
        data = in_queue.get()
        if not init:
            z = data[3]
            init = True
        if data == 'STOP':
            out_queue.put([result1, result2, z])
            out_queue.put('STOP')
            break
        if data[3] != z and result1:
            out_queue.put(copy.deepcopy([result1, result2, z]))
            result1.clear()
            result2.clear()
            d = 0
            z = data[3]
            pass
        magic = magic_calculation(data[0], data[1], data[2])
        result1.append(magic + d)
        result2.append(magic)
        d = magic + d
        

def main(read_file, the_set, dataset_out):
    number_of_cores = multiprocessing.cpu_count()
    queue_size = 100

    queues = [multiprocessing.Queue(maxsize=queue_size) for i in range(number_of_cores)]
    queue_ids = ["Queue{}".format(i) for i in range(number_of_cores)]
    queue_dict = dict(zip(queue_ids, queues))
    out_queue = multiprocessing.Queue(maxsize=queue_size)

    processes = [multiprocessing.Process(target=the_process, args=(queue_dict[queue_id], out_queue)) for queue_id in queue_ids]

    reading = threading.Thread(target=read_thread, args=(read_file, the_set, y, queue_dict))
    writing = threading.Thread(target=write_thread, args=(dataset_out, out_queue))

    reading.start()
    for process in processes:
        process.start()
    writing.start()

    reading.join()
    for process in processes:
        process.join()
    writing.join()

What do you think? Should I have a different architecture for this case? Or can I make some adjustments to this one? Where should I start to make it faster?

Tsyvarev
  • 60,011
  • 17
  • 110
  • 153
Makhaos
  • 13
  • 5
  • I would start with profiling. How much time does the program spend on reading from the file, writing to the file, calculating in each process, putting to and getting from the queue? What is the level of CPU and IO utilization? The answers to these questions would show you where the optimization is needed in the first place. – bartolo-otrit Apr 26 '23 at 14:57
  • 1
    Ditto what @bartolo-otrit said. You are reading A LOT of data -- 1.53 TiB for float64 with shape (100, 70001, 30000) and writing 31 GiB. I suspect most of the time is spent on I/O (especially if you have a mechanical disk). Also, if I understand `read_thread()`, you loop on axis 2 and 1 (as z and t) to read, calculate and write a row of individual elements. This is a very slow way to read/write HDF5 files. It would be much better to work on a larger slice (say z=0-100 or 1000). – kcw78 Apr 26 '23 at 15:24
  • Also, I am surprised your multithread write process with works with h5py. By default h5py supports SWMR: Single Write, Multiple Read. You can read HDF5 files in parallel with h5py as-is. However, to write HDF5 files in parallel both HDF5 and h5py must be compiled with MPI support turned and you need to use the `mpi4py` package. The steps are described here: [Parallel HDF5](https://docs.h5py.org/en/stable/mpi.html#parallel-hdf5) – kcw78 Apr 26 '23 at 15:41
  • 1
    Correction for SWMR: h5py does not release the global interpreter lock (GIL), so multithreading is pointless with h5py. SWMR works with multiple processes. Easiest solution: Let each of your worker processes read in parallel from the file, let the main process coalesce and write the output. – Homer512 Apr 27 '23 at 06:07
  • Also just to be clear: You're not using compression, especially on the output, right? And your datasets are contiguous, not chunked? – Homer512 Apr 27 '23 at 06:13
  • Which chunk size and chunk cache size are you using? This can have a huge of influence on performance. https://stackoverflow.com/a/48405220/4045774 https://stackoverflow.com/a/44961222/4045774 (h5py_cache is not needed anymore, see first answer) – max9111 Apr 27 '23 at 08:38
  • In terms of profiling, the queueing is a bottleneck in terms of speed, from what I could decipher. So now I'm looking for a solution with SWMR as it was mentioned, without threading. Multiprocessing accessing the data of the h5py file, and the writing done from the main process. It's still relatively slow. But I do have compression and chunked data: (dtype='float32', chunks=500, compression="gzip", compression_opts=9, shuffle=True, fletcher32=True) A mistake? – Makhaos Apr 28 '23 at 16:05
  • The chunk shape needs to fit the access pattern for good performance. HDF5 reads/writes the entire chunk every time a a single element in it is accessed. And yeah, compression is expensive. Not so much reading, but writing. Unless I'm mistaken fletcher is redundant since gzip includes a CRC – Homer512 Apr 30 '23 at 09:04
  • Compression can be expensive, especially with large values like yours. From my limited experience, going from 1->9 is much slower and you don't gain a lot of compression. I would benchmark writing without compression and/or with `compression_opts=1`. You can compress later with the `h5repack` utility. Ditto about chuck shape -- it is super important. Finally, for better I/O performance increase the block(slice) sizes to largest possible size. – kcw78 May 03 '23 at 14:15

0 Answers0