-1

I am bulding a python script that uses asyncio and 3 queues. I am processing data from different sources in 4 steps and the idea is to use the queues to save the results from one step to be used in the next step as soons as it possible. The script is doing what it should but for some reason I am not figuring out when all the data has been processed the script don't finish. To try to understant the problem I build a simplified version of the script where I do simple math operations.

First I populate the first queue with 50 randon number between 0 and 10. Next I get the numbers stored in queue1, square it and put the result on queue2. Next I get the squared number stored in queue2, double it and store the result in queue3. Finally I get the final result stored in queue3 and append it to a dataframe and save the result to a file.

As I said. The procedure described above works but when I finish processing all the elemens queue3 I was expecting that the procedure would finish.

This my first version of the toy code I build to demonstrate my problem

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue):
    for k in range(50):
        r=random.randint(0,10)
        #await asyncio.sleep(r)
        await queue.put((k,r))

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    await queue1.join()
    await queue2.join() 
    await queue3.join()

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

Doing some research about this problem I found this another question

[1]: Using Multiple Asyncio Queues Effectively which suggest not use of queue.join and use of sentinel shutdonw.

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

def __init__(self):
    self.df = pd.DataFrame(columns=['id','final_value'])

async def generate_random_number(self,i:int,queue1):
    for k in range(50):
        r=random.randint(0,10)
        queue1.put_nowait((k,r))
    queue1.put_nowait(None)

async def square_it(self,n,queue1,queue2):
    while True:
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
        r=await queue1.get()
        if r is None:
            await queue2.put(None)
            break

        await asyncio.sleep(5)
        await queue2.put((r[0],r[1]*r[1]))
        queue1.task_done()
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')

async def double_it(self,n,queue2,queue3):
    while True:
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
        r=await queue2.get()
        if r is None:
            await queue3.put(None)
            break

        await asyncio.sleep(10)
        await queue3.put((r[0],2*r[1]))
        queue2.task_done()
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')

async def save_it(self,n,queue3):
    while True:
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
        r=await queue3.get()
        if r is None:
            break

        await asyncio.sleep(1)
        self.df.loc[len(self.df)]=[r[0],r[1]]
        self.df.to_csv('final_result.csv')
        queue3.task_done()
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')

async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)

    for a in square_scan:
        a.cancel()

    for b in square_scan:
        b.cancel()

    for c in save_scan:
        c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())

But it didn't solve the problem. I have also try to remove the functions from the class definition but it didn't work as well.

I am starting working with asyncio module and I think I am doing some basic mistake that I am not able to see. Any tips will be welcome.

UPDATE

I have simplified the problem even further and got some interesting effect that can lead to the answer. I created another toy code that use just one queue where I store the initial randon number. The code get the number from this queue squared it and print in termial. This peace of code finish. So I think that maybe the problem is related, in some way, to the fact that I am using more than one queue.

import asyncio
import random

class asyncio_toy():

    def __init__(self):
        ...

    async def generate_random_number(self,i:int,queue):
        for _ in range(i):
            r=random.randint(0,5)
            await asyncio.sleep(r)
            await queue.put((i,r))
    
    async def square_scan(self,k,queue):
        while True:
            (i,r) = await queue.get()
            print(f'prod {i} - cons {k} - {r} - {r*r}')
            queue.task_done()

    async def main(self):
        queue = asyncio.Queue()
        prod = [asyncio.create_task(self.generate_random_number(n,queue)) for n in range(5)]
        cons = [asyncio.create_task(self.square_scan(k,queue)) for k in range(4)]
        
        await asyncio.gather(*prod)
        await queue.join() 
        
        for c in cons:
            c.cancel()

### testing
if __name__ == '__main__':

    toy=asyncio_toy()

    asyncio.run(toy.main())
Dariva
  • 330
  • 2
  • 13
  • as for me problem can be because many threads use the same queue - and this can make conflict. If you send `None` then only one thread will get it and only one tread will run `break` - and other threads will still run. – furas Jul 09 '21 at 16:08
  • i would create single function which do all calculation for single value - `square_it` , `double_it`, `save_it` - because they depends one on another and using threads may not help. And then I would run it in threads. And if you create 5 threads then I would send 5 `None` to stop them. – furas Jul 09 '21 at 16:12
  • you have mistake - you run `for ... in square_scan: cancel()` two times and you forgot it for `double_scan` – furas Jul 09 '21 at 16:20
  • code works for me if I send `None` five times - `for x in range(5): put(None)` - and if I remove all `queue.join()` – furas Jul 09 '21 at 16:25

2 Answers2

1

Code works for me if I send None five times because there is five functions which use the same queue and all of them need None to exit while-loop.

 for x in range(5): 
     queue.put(None)

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])

    async def generate_random_number(self, i:int, queue):
        for k in range(10):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k,r))
            
        for x in range(5):
            await queue.put(None)
            
        
    async def square_it(self,n,queue1,queue2):
        while True:
            #print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
            r = await queue1.get()
            
            if r is None:
                print('exit: SQUARE IT', n)  
                await queue2.put(None)
                break
            
            k, r = r
            await asyncio.sleep(1)
            await queue2.put((k, r*r))
            queue1.task_done()
            #print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2:{str(queue2.qsize()).zfill(2)}')
    
    async def double_it(self,n,queue2,queue3):
        while True:
            #print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
            r = await queue2.get()
            
            if r is None:
                print('exit: DOUBLE IT', n)  
                await queue3.put(None)
                break
            
            k, r = r
            await asyncio.sleep(1)
            await queue3.put((k, 2*r))
            queue2.task_done()
            #print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3:{str(queue3.qsize()).zfill(2)}')
    
    async def save_it(self,n,queue3):
        while True:
            #print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
            r = await queue3.get()
            
            if r is None:
                print('exit: SAVE IT', n)  
                break
            
            k, r = r
            await asyncio.sleep(1)
            self.df.loc[len(self.df)]=[k, r]
            self.df.to_csv('final_result.csv')
            queue3.task_done()
            #print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')
    
    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result
    
        rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(5)]
        save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(5)]
        
        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')
        
        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()
    
        print('cancel')
        for a in square_scan:
            a.cancel()
    
        for b in double_scan:
            b.cancel()
    
        for c in save_scan:
            c.cancel()
    
### testing
if __name__ == '__main__':

    toy = asyncio_toy()

    asyncio.run(toy.main())

EDIT:

Version which uses while running and running = False to stop all threads.

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

class asyncio_toy():

    def __init__(self):
        self.df = pd.DataFrame(columns=['id','final_value'])
        self.running_square_it = True
        self.running_double_it = True
        self.running_save_it = True
        
    async def generate_random_number(self, i, queue):
        for k in range(20):
            r = random.randint(0, 10)
            #await asyncio.sleep(r)
            await queue.put((k, r))
            
        #for x in range(5):
        await queue.put(None)
            
        
    async def square_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
 
        while self.running_square_it:
        
            if not queue_input.empty():
                r = await queue_input.get()
            
                if r is None:
                    print('exit: SQUARE IT', n)  
                    await queue_output.put(None)
                    self.running_square_it = False
                else:
                    k, r = r
                    await queue_output.put((k, r*r))
                    
            await asyncio.sleep(0.1)  # need it to run other loops
            
        print(f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue_input.qsize()).zfill(2)} - q2:{str(queue_output.qsize()).zfill(2)}')
    
    
    async def double_it(self, n, queue_input, queue_output):
        print(f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
        
        while self.running_double_it:

            if not queue_input.empty():
                r = await queue_input.get()
            
                if r is None:
                    print('exit: DOUBLE IT', n)  
                    await queue_output.put(None)
                    self.running_double_it = False
                else:
                    k, r = r
                    await queue_output.put((k, 2*r))

            await asyncio.sleep(0.1)  # need it to run other loops
            
        print(f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue_input.qsize()).zfill(2)} - q3:{str(queue_output.qsize()).zfill(2)}')
    
    
    async def save_it(self, n, queue_input):
        print(f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue_input.qsize()).zfill(2)}')
        
        while self.running_save_it:
            
            if not queue_input.empty():
                r = await queue_input.get()
                
                if r is None:
                    print('exit: SAVE IT', n)  
                    self.running_save_it = False
                else:            
                    k, r = r
                    self.df.loc[len(self.df)] = [k, r]
                    self.df.to_csv('final_result.csv')
 
            await asyncio.sleep(0.1)  # need it to run other loops
           
        print(f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue_input.qsize()).zfill(2)}')
    
    
    async def main(self):
        queue1 = asyncio.Queue() # store the randon number
        queue2 = asyncio.Queue() # stores the squared number
        queue3 = asyncio.Queue() # store the final result
    
        rand_gen    = [asyncio.create_task(self.generate_random_number(n, queue1)) for n in range(1)]
        square_scan = [asyncio.create_task(self.square_it(k, queue1, queue2)) for k in range(5)]
        double_scan = [asyncio.create_task(self.double_it(k, queue2, queue3)) for k in range(10)]
        save_scan   = [asyncio.create_task(self.save_it(k, queue3)) for k in range(5)]
        
        await asyncio.gather(*rand_gen)
        await asyncio.gather(*square_scan)
        await asyncio.gather(*double_scan)
        await asyncio.gather(*save_scan)

        print('join')
        
        #await queue1.join()
        #await queue2.join() 
        #await queue3.join()
    
        print('cancel')
        #for a in square_scan:
        #    a.cancel()
    
        #for b in double_scan:
        #    b.cancel()
    
        #for c in save_scan:
        #    c.cancel()
    
### testing
if __name__ == '__main__':
    toy = asyncio_toy()
    asyncio.run(toy.main())
furas
  • 134,197
  • 12
  • 106
  • 148
  • Nice solution! Thanks very much. I have just one question. This would work just if I have the same number of tasks for each step. Is it correct? How would you do if there are different number of tasks in each step? – Dariva Jul 09 '21 at 17:22
  • 1
    you could use `while running_square_it` instead of `while True` and then you can set `running_square_it = False` to finish all loops in all `square_it`. So when one function get `None` then it should set `running_square_it = False` to stop other threads. I would also run all code in `if not queue.empty():` so it will not block code when there is no items in queue. – furas Jul 09 '21 at 17:36
  • 1
    I added version which uses `while running` and `running = False` to stop all threads. And I uses different `range()` to create different number of threads. BTW: I would use only one thread to save data - using many threads to access the same file may create problem. – furas Jul 09 '21 at 18:25
  • Could you post how you did it? I try it here but the I am trying is not working. Ops.. I just saw that you posted it. Thanks – Dariva Jul 09 '21 at 18:31
  • There is a problem with this idea. If we simulate some variable dealy in the square and double operation eventually we can end up with empty queues but with some data been processed between them which cause the script to finish before it process all data. I come up with a solution that I will post as an answer. The basic idea is to use a token for each task in each step and monitor both if the input queue is empty and if all tokens are released. – Dariva Jul 13 '21 at 00:25
  • normally I would run all code `square_it` and `double_it` as one function because I think using separated functions can't make it faster - `double_it` depends on values from `square_it` so spliting it not make sense for me - and running more threads and sending data in queue may make it slower. – furas Jul 13 '21 at 00:33
  • For this toy example you are right. But my real application is a little more complicate. It reads data from different sources and the time to process each input in queue2 varies a lot. I would like to thank you. As I stated I neved worked with asyncio before (at least to directly). Your help make me undestand better how it works. – Dariva Jul 13 '21 at 02:24
0

The idea proposed by furas was interesting but playing with it a litte bit more I found a problem on it. If I add a random delay before executing the square and double operations (to simulate real operations) eventually we can have a situation where one of the queues is empty but some data is been processed so the script is interrupted before it process all data. To solve this problem I come up with an idea of using tokens (a simple list of boolean values) for each task so besides the size of the queue (or if it is empty or not) I also check if all tokens are released (false) and if the previous step is completed. This solution works for any number of task in each step and for any time it takes to execute each task. Here is code

import os
import warnings
import asyncio
import random
import pandas as pd
from datetime import datetime

os.environ['PYTHONASYNCIODEBUG'] = '1'
warnings.resetwarnings()

# ANSI colors
c = (
    "\033[0m",   # End of color
    "\033[36m",  # Cyan
    "\033[91m",  # Red
    "\033[35m",  # Magenta
)

class asyncio_toy():
def __init__(self,num_of_tasks):
    self.df = pd.DataFrame(columns=['id','final_value'])

    self.square_it_tolken=[False]*num_of_tasks[0]
    self.double_it_tolken=[False]*num_of_tasks[1]
    self.save_it_tolken=[False]*num_of_tasks[2]

    self.running_square = True
    self.running_double = True
    self.running_saveit = True

    self.num_of_tasks=num_of_tasks

async def generate_random_number(self,i:int,queue1):
    for k in range(50):
        r=random.randint(0,10)
        queue1.put_nowait((k,r))

async def square_it(self,n,queue1,queue2):
    while self.running_square:
        
        #if queue1.qsize()>0:
        if not queue1.empty():

            print(c[1]+ f'{datetime.now()} - START SQUARE IT task {n} q1: {str(queue1.qsize()).zfill(2)} - q2: {str(queue2.qsize()).zfill(2)}')
            self.square_it_tolken[n]=True # take a token
            print(self.square_it_tolken)

            r=await queue1.get()
            await asyncio.sleep(random.randint(15,30))
            await queue2.put((r[0],r[1]*r[1]))

            print(c[1]+ f'{datetime.now()} - END SQUARE IT   task {n} q1: {str(queue1.qsize()).zfill(2)} - q2: {str(queue2.qsize()).zfill(2)}')
            self.square_it_tolken[n]=False # return a token
            print(self.square_it_tolken)

        if queue1.empty() and all(not token for token in self.square_it_tolken):
            print('exit: SQUARE IT', n)
            self.running_square = False

        await asyncio.sleep(0.1) # need it to run other loops

async def double_it(self,n,queue2,queue3):
    while self.running_double:
        
        #if queue2.qsize()>0:
        if not queue2.empty():

            print(c[2]+ f'{datetime.now()} - START DOUBLE IT task {n} q2: {str(queue2.qsize()).zfill(2)} - q3: {str(queue3.qsize()).zfill(2)}')
            self.double_it_tolken[n]=True # take a token
            print(self.double_it_tolken)

            r=await queue2.get()
            await asyncio.sleep(random.randint(5,15))
            await queue3.put((r[0],2*r[1]))

            print(c[2]+f'{datetime.now()} - END DOUBLE IT   task {n} q2: {str(queue2.qsize()).zfill(2)} - q3: {str(queue3.qsize()).zfill(2)}')
            self.double_it_tolken[n]=False # release a token
            print(self.double_it_tolken)

        if queue2.empty() and all(not token for token in self.double_it_tolken) and not self.running_square:
            print('exit: DOUBLE IT', n)
            self.running_double = False

        await asyncio.sleep(0.1)

async def save_it(self,n,queue3):
    while self.running_saveit:
        
        #if queue3.qsize()>0:
        if not queue3.empty():

            print(c[3]+f'{datetime.now()} - START SAVE IT   task {n} q3: {str(queue3.qsize()).zfill(2)}')
            self.save_it_tolken[n]=True # take a token
            print(self.save_it_tolken)

            r=await queue3.get()
            await asyncio.sleep(random.randint(1,5))
            self.df.loc[len(self.df)]=[r[0],r[1]]
            self.df.to_csv('final_result.csv')

            print(c[3]+f'{datetime.now()} - END SAVE IT     task {n} q3: {str(queue3.qsize()).zfill(2)}')
            self.save_it_tolken[n]=False # release a token
            print(self.save_it_tolken)

        if queue3.empty() and all(not token for token in self.save_it_tolken) and not self.running_double:
            print('exit: SAVE IT', n)
            self.running_saveit = False

        await asyncio.sleep(0.1)
        
async def main(self):
    queue1 = asyncio.Queue() # store the randon number
    queue2 = asyncio.Queue() # stores the squared number
    queue3 = asyncio.Queue() # store the final result

    rand_gen = [asyncio.create_task(self.generate_random_number(n,queue1)) for n in range(1)]
    square_scan = [asyncio.create_task(self.square_it(k,queue1,queue2)) for k in range(self.num_of_tasks[0])]
    double_scan = [asyncio.create_task(self.double_it(k,queue2,queue3)) for k in range(self.num_of_tasks[1])]
    save_scan = [asyncio.create_task(self.save_it(k,queue3)) for k in range(self.num_of_tasks[2])]
    
    await asyncio.gather(*rand_gen)
    await asyncio.gather(*square_scan)
    await asyncio.gather(*double_scan)
    await asyncio.gather(*save_scan)
    

### testing
if __name__ == '__main__':

    toy=asyncio_toy([5,10,5])

    asyncio.run(toy.main())
Dariva
  • 330
  • 2
  • 13
  • I wondering if you could use counter instead of tokens - at start you would set `square_counter = num_of_tasks[0]` and every task would do `square_counter -= 1` and when you get `square_counter == 0` then you could finish task. But maybe it would need to `lock` variable when it substracts value - to correctly use it. – furas Jul 13 '21 at 12:02
  • I think it would work also using a counter. I use the token approach because during the debug I could follow more easily which task was running. – Dariva Jul 13 '21 at 13:51