I'm developing a script where data needs to be analyzed, that is already available in an hdf5 file. This data can range from 10gb to a terabyte.
I've created a script in Python, but the entire flow is quite slow.
The flow:
I am reading from this hdf5 file, that has a 3D dataset with a shape such as: (100, 70001, 30000).
Then I make calculations, here represented as magic_calculation.
Finally I create a new hdf5 file to write all the results, it becomes a file with a shape (2, 70000, 30000).
My current architecture goes as follow:
The main process creates 2 threads, one to read and one to write in the files.
Then X processes from multiprocessing are created and also X queues.
The reading thread populates X queues.
The processes are getting data from the X queues and doing the calculations. Then it also put the result data in an out queue.
The write thread gets the out queue and writes in the file.
The X queues were created since each process needs to have calculations history for each z (x, y, z).
The problem I'm facing: it's quite slow.
A code sample:
import utils
from maths import magic_calculation
import multiprocessing
import threading
import copy
import os
def read_thread(read_file, the_set, y, queue_dict):
queue_len = len(queue_dict)
with utils.read_hdf5(read_file) as file:
ds = file[the_set]
for z in range(30000):
for t in y[0:len(y) - 1]:
for queue_count, queue in enumerate(queue_dict.values()):
data1 = ds[:, t, z*queue_len+queue_count]
data2 = ds[:, t + 1, z*queue_len+queue_count]
queue.put([data1, data2, z*queue_len+queue_count])
queue.put('STOP')
def write_thread(ds, queue):
with utils.write_hdf5(write_file) as file:
while True:
result = queue.get()
if result == 'STOP':
break
z = result[2]
file[ds][0, :, z] = result[0]
file[ds][1, :, z] = result[1]
print("DONE!")
def the_process(in_queue, out_queue):
result1 = list()
result2 = list()
d = 0
init = False
while True:
data = in_queue.get()
if not init:
z = data[3]
init = True
if data == 'STOP':
out_queue.put([result1, result2, z])
out_queue.put('STOP')
break
if data[3] != z and result1:
out_queue.put(copy.deepcopy([result1, result2, z]))
result1.clear()
result2.clear()
d = 0
z = data[3]
pass
magic = magic_calculation(data[0], data[1], data[2])
result1.append(magic + d)
result2.append(magic)
d = magic + d
def main(read_file, the_set, dataset_out):
number_of_cores = multiprocessing.cpu_count()
queue_size = 100
queues = [multiprocessing.Queue(maxsize=queue_size) for i in range(number_of_cores)]
queue_ids = ["Queue{}".format(i) for i in range(number_of_cores)]
queue_dict = dict(zip(queue_ids, queues))
out_queue = multiprocessing.Queue(maxsize=queue_size)
processes = [multiprocessing.Process(target=the_process, args=(queue_dict[queue_id], out_queue)) for queue_id in queue_ids]
reading = threading.Thread(target=read_thread, args=(read_file, the_set, y, queue_dict))
writing = threading.Thread(target=write_thread, args=(dataset_out, out_queue))
reading.start()
for process in processes:
process.start()
writing.start()
reading.join()
for process in processes:
process.join()
writing.join()
What do you think? Should I have a different architecture for this case? Or can I make some adjustments to this one? Where should I start to make it faster?