1
from flask import Flask, request, jsonify
from sentence_transformers import SentenceTransformer
import asyncio

loop = asyncio.get_event_loop()
app = Flask(__name__)

prio_queue = asyncio.PriorityQueue()




# Load the SentenceTransformer model
model = SentenceTransformer('all-MiniLM-L6-v2')

async def embed_task_loop():
    while True:
        # get next item
        prio, p= await prio_queue.get()
        sentences, fut = p
        try:
            # Encode the sentences using the model
            embeddings = model.encode(sentences)

            # Create a dictionary to hold the sentence-embedding pairs
            result = {
                'texts': sentences,
                'embeddings': embeddings.tolist()
            }

            #return jsonify(result), 200
            fut.set_result((jsonify(result), 200))

        except Exception as e:
            #return jsonify(error=str(e)), 500
            fut.set_result((jsonify(error=str(e)), 500))
            


async def add_to_prio_queue(sentences, prio):
    global prio_queue


    # add to prio queue always if prio is one, if prio is zero add only if prio queue is not larger than 10
    if prio == 1 or (prio == 0 and prio_queue.qsize() < 10):

        fut=loop.create_future()
        package = (prio, (sentences, fut))

        prio_queue.put_nowait(package)
    else:

        fut.set_result((jsonify(error='Too many requests'), 429))


    return await fut

@app.route('/embed', methods=['POST'])
def embed_sentences():
    # Get the list of sentences from the request body
    data = request.get_json(force=True)
    sentences = data.get('texts', [])
    prio= data.get('prio', 0)

    if not sentences:
        return jsonify(error='No sentences provided'), 400


    result = loop.run_until_complete(add_to_prio_queue(sentences, prio))
    return result
 


if __name__ == '__main__':

    # start the embed task loop
    with app.app_context():
        loop.create_task(embed_task_loop())

    app.run()

I have this code that contains two parts, an api that takes in sentences to be embedded (you only need to know that that is a process that takes a while) and adds them to a priority queue. High priority tasks are always processed first and low priority tasks may be rejected. The embedding threads work simultaneously and enqueuing an embedding task will return a future that can be awaited. Unfortunately, Flask and asyncio really don't like each other and so if I try to use await instead of loop.run_until_complete I get this error:

RuntimeError: Task <Task pending name='Task-9' coro=<AsyncToSync.main_wrap() running at /home/user/miniconda3/envs/tf/lib/python3.9/site-packages/asgiref/sync.py:353> cb=[_run_until_complete_cb() at /home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop

with loop.run_until_complete It sort of kind of works but it regularly gives me errors like these (most likely because the threads start to overlap):

Traceback (most recent call last):
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 2529, in wsgi_app
    response = self.full_dispatch_request()
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1825, in full_dispatch_request
    rv = self.handle_user_exception(e)
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1823, in full_dispatch_request
    rv = self.dispatch_request()
  File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1799, in dispatch_request
    return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
  File "/home/user/projects/vector-crawler/backend/src/embed.py", line 78, in embed_sentences
    result = loop.run_until_complete(add_to_prio_queue(sentences, prio))
  File "/home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py", line 623, in run_until_complete
    self._check_running()
  File "/home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py", line 583, in _check_running
    raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running

It also crashes pretty hard on exit, although thats not that big of a deal So is there a clean way to handle and await promises in Flask? Or is there another way to solve this?

user2741831
  • 2,120
  • 2
  • 22
  • 43
  • 1
    used quart, its better – user2741831 Aug 06 '23 at 13:43
  • In order not to interfere with the rest of the Flask/WSGI, you probably should spawn a separate thread and run your asyncio event loop there. To communicate with that loop look e.g. at the examples here: https://stackoverflow.com/q/32059732/5378816 – VPfB Aug 06 '23 at 15:09
  • Problem is then I cannot use futures since they are made in a different event loop – user2741831 Aug 07 '23 at 05:51
  • There are a couple flask-based web frameworks that use views as async functions by default, like `quart`, `sanic` and even `FastApi` - esch of these will be compatible with what you need there out of the box. You really should check another option. What you want can be done, but this "impedance" simply will not go away. – jsbueno Aug 07 '23 at 18:27

0 Answers0