0

I got a problem when trying to use multiprocessing in python to prepare batchs of my data, I used a multiprocessing.Pool to start multiple processes each of which processed a batch of my data, and I used a multiprocessing.Manager to make a Queue for containing the processed data and getting them. My ultimate goal is to make multiple processes processing data simultaneously, so I use pool.apply_async to add task in pool, however, I found that all tasks are running in serial rather than parallel.

Here's my multiprocessing code:

for epoch in range(num_epochs):
            timer = d2l.Timer()
"""write to queue"""
            from multiprocessing import Manager
            ma1 = Manager()
            q = ma1.Queue(maxsize=40)
            p = Pool(32)
            for i in range(giveashuffleindx.nb_updates_per_epoch):
                p.apply_async(write_to_queue,(trainset, trainset_volume_manager, batchsize,max_length, i, giveashuffleindx.shuffledata_indices, q, False), error_callback=print_error)
            print ('Waiting for all subprocesses done...')
            p.close()

            metric = d2l.Accumulator(2) 
"""get data from queue and then send to model"""
            t=0
            while t < giveashuffleindx.nb_updates_per_epoch:
                batch = q.get(True)
                print('count {} from {}'.format(t, q))
                optimizer.zero_grad()
                X, Y, Mask, batch_len= [x.to(devices) for x in batch]
                X = X.to(torch.float32)
                state = net.init_state()
                Y_hat, _ = net(X, state)
                B, T, C = Y_hat.shape
                Y_hat = Y_hat.view(B*T, C)
                Y = Y.view(B*T,C)
                l = F.cross_entropy(Y_hat, Y)
                l.backward()  # 
                d2l.grad_clipping(net, 1)
                optimizer.step()
                with torch.no_grad():
                    metric.add(l, X.size(0))
                t+=1
            p.join()

Here's the task added to pool:

def write_to_queue(traindata, volume_manager, batch_size,pad_len, batch_count,indices, queue, use_argument = False):
    print('write batch{} to {},pid is {}, {}'.format(str(batch_count),queue,os.getpid(),time.ctime()))
    startime = time.time()
    start = batch_count * batch_size
    end = min((batch_count + 1) * batch_size, len(traindata))
    batch_indices = indices[slice(start,end)]
    
    batch = prepare_batch(traindata,volume_manager, batch_indices,pad_len,use_argument)

    queue.put(batch)
    endtime = time.time()
    print('task {} runs {} seconds'.format(str(batch_count), endtime-startime))

Here's the message print in terminal:

main process is 21793
core number is 40
write batch0 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25649, Mon Apr 17 11:58:08 2023
task 0 runs 9.252592325210571 seconds
write batch1 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25651, Mon Apr 17 11:58:33 2023
task 1 runs 9.321570873260498 seconds
write batch2 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25656, Mon Apr 17 11:58:55 2023
task 2 runs 10.784194707870483 seconds
write batch3 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25659, Mon Apr 17 11:59:16 2023
task 3 runs 9.447248220443726 seconds
write batch4 to <queue.Queue object at 0x7f32e0111cd0>,pid is 25664, Mon Apr 17 11:59:39 2023
task 4 runs 9.474673986434937 seconds

it seems that tasks will perform serially, and in this case multiprocessing doesn't have the effect of accelerating my task,right? How can I make the tasks in pool perform simultaneously?

yi yang
  • 11
  • 2
  • 1
    don't know the real reason, but when I tried to reduce the number of arguments passed in the `Pool.apply_async()` by treating most arguments as `Global Variable`, the subprocesses in the pool do execuate at the same time. – yi yang Apr 18 '23 at 09:44
  • How big are the return results that you're writing to the queue? Managed queues are serialized, and if most of the work of the task is just to write a large result, you won't get much of a savings from multitasking. – Frank Yellin Apr 18 '23 at 17:42
  • I don’t know exactly how big it is , but when I double the batch size, it indeed needs to wait longer for next subprocess to begin to execute. However, if I pass arguments in the way I mentioned up there, 40 tasks in the pool can start simultaneously. What's more, when I double the batch size, it just needs to wait the same amount of additional time for the next 40 subprocess to begin. – yi yang Apr 21 '23 at 09:16
  • @Frank Yellin ,sincerely thank you for your reply! – yi yang Apr 21 '23 at 09:18

0 Answers0