I got a problem when trying to use multiprocessing in python to prepare batchs of my data, I used a multiprocessing.Pool
to start multiple processes each of which processed a batch of my data, and I used a multiprocessing.Manager
to make a Queue
for containing the processed data and getting them. My ultimate goal is to make multiple processes processing data simultaneously, so I use pool.apply_async
to add task in pool
, however, I found that all tasks are running in serial rather than parallel.
Here's my multiprocessing code:
for epoch in range(num_epochs):
timer = d2l.Timer()
"""write to queue"""
from multiprocessing import Manager
ma1 = Manager()
q = ma1.Queue(maxsize=40)
p = Pool(32)
for i in range(giveashuffleindx.nb_updates_per_epoch):
p.apply_async(write_to_queue,(trainset, trainset_volume_manager, batchsize,max_length, i, giveashuffleindx.shuffledata_indices, q, False), error_callback=print_error)
print ('Waiting for all subprocesses done...')
p.close()
metric = d2l.Accumulator(2)
"""get data from queue and then send to model"""
t=0
while t < giveashuffleindx.nb_updates_per_epoch:
batch = q.get(True)
print('count {} from {}'.format(t, q))
optimizer.zero_grad()
X, Y, Mask, batch_len= [x.to(devices) for x in batch]
X = X.to(torch.float32)
state = net.init_state()
Y_hat, _ = net(X, state)
B, T, C = Y_hat.shape
Y_hat = Y_hat.view(B*T, C)
Y = Y.view(B*T,C)
l = F.cross_entropy(Y_hat, Y)
l.backward() #
d2l.grad_clipping(net, 1)
optimizer.step()
with torch.no_grad():
metric.add(l, X.size(0))
t+=1
p.join()
Here's the task added to pool:
def write_to_queue(traindata, volume_manager, batch_size,pad_len, batch_count,indices, queue, use_argument = False):
print('write batch{} to {},pid is {}, {}'.format(str(batch_count),queue,os.getpid(),time.ctime()))
startime = time.time()
start = batch_count * batch_size
end = min((batch_count + 1) * batch_size, len(traindata))
batch_indices = indices[slice(start,end)]
batch = prepare_batch(traindata,volume_manager, batch_indices,pad_len,use_argument)
queue.put(batch)
endtime = time.time()
print('task {} runs {} seconds'.format(str(batch_count), endtime-startime))
Here's the message print in terminal:
main process is 21793
core number is 40
write batch0 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25649, Mon Apr 17 11:58:08 2023
task 0 runs 9.252592325210571 seconds
write batch1 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25651, Mon Apr 17 11:58:33 2023
task 1 runs 9.321570873260498 seconds
write batch2 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25656, Mon Apr 17 11:58:55 2023
task 2 runs 10.784194707870483 seconds
write batch3 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25659, Mon Apr 17 11:59:16 2023
task 3 runs 9.447248220443726 seconds
write batch4 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25664, Mon Apr 17 11:59:39 2023
task 4 runs 9.474673986434937 seconds
it seems that tasks will perform serially, and in this case multiprocessing doesn't have the effect of accelerating my task,right? How can I make the tasks in pool perform simultaneously?