4

Hey guys so I'm building a websocket using flask and I'm currently creating a new thread for every request so that thread can do some heavy processing and not take too much time to return something from the request. The problem is that at some point i get so many threads open that it starts causing problems and i was wondering if i could setup some queue in flask to limit the app for creating like 8 thread each time only.

My code:

def process_audio(bucket_name, key, _id, extension):
    S3_CLIENT = boto3.client('s3', region_name=S3_REGION)
    print('Running audio proccessing')
    INPUT_FILE = os.path.join(TEMP_PATH, f'{_id}.{extension}')
    print(f'Saving downloaded file to {INPUT_FILE}')
    S3_CLIENT.download_file(bucket_name, key, INPUT_FILE)
    print('File downloaded')
    process = stt.process_audio(INPUT_FILE)
    print(f'Audio processed by AI returned: "{process}"')
    stt.reset()
    ai = get_sentimentAI_results(process)
    if ai:
        print(f'Text processed by AI returned class {ai[0]} with a certainty of {ai[1]}%')
        return True

    print('Request to sentiment AI endpoint failed for an unkown reason. Check CloudWhatch for more information!')
    return False

@app.route('/process/audio', methods=['POST'])
def process_new_audio():
    print('Recieving new request')
    data = request.data

    if not data:
        return '', 404

    data = json.loads(data)
    bucket_name = data.get('bucket_name')
    key = data.get('key')
    _id = data.get('id')
    extension = data.get('file_extension')

    if not key or not bucket_name or not _id or not extension:
        return '', 404

    thread = Thread(target=process_audio, kwargs={'bucket_name': bucket_name, 'key': key, '_id': _id, 'extension': extension})
    thread.start()
    
    return '', 200

Resume of the problem:

This flask function works as a webhook triggered by AWS lambda that creates a thread to process the data without making the lambda wait for it to finish. I just need a way to create a queue so in case 100 requests are made I don't have 100 threads running but only 5 at each time for example

DeadSec
  • 808
  • 1
  • 12
  • 38

2 Answers2

2

This is a pretty common problem when multi-threading, the solution you need here is called the producer-consumer model where there is a producer (the entity that creates work) then there is a (threadsafe) queue where this work is pushed into, then there are consumers (worker threads) that one by one pop out work from the queue until the queue is empty.

Doing this limits the number of worker threads. One neat way to do this is to use the concurrent.futures library available in python. @aaron as given an appropriate link for that.

Zoe
  • 27,060
  • 21
  • 118
  • 148
Anuraag Barde
  • 399
  • 3
  • 9
1

I have dealt with a similar sounding service, and we solved the task queueing problem by adding a Celery app to our flask server. the threads that get the requests, generate a Celery task and queue it for the app to be handled at a time where the Celery app has enough resources. we followed this tutorial:

https://flask.palletsprojects.com/en/2.0.x/patterns/celery/

If you have any issues implementing this, feel free to ask with your progress.

Avishay Cohen
  • 1,978
  • 2
  • 21
  • 34