from flask import Flask, request, jsonify
from sentence_transformers import SentenceTransformer
import asyncio
loop = asyncio.get_event_loop()
app = Flask(__name__)
prio_queue = asyncio.PriorityQueue()
# Load the SentenceTransformer model
model = SentenceTransformer('all-MiniLM-L6-v2')
async def embed_task_loop():
while True:
# get next item
prio, p= await prio_queue.get()
sentences, fut = p
try:
# Encode the sentences using the model
embeddings = model.encode(sentences)
# Create a dictionary to hold the sentence-embedding pairs
result = {
'texts': sentences,
'embeddings': embeddings.tolist()
}
#return jsonify(result), 200
fut.set_result((jsonify(result), 200))
except Exception as e:
#return jsonify(error=str(e)), 500
fut.set_result((jsonify(error=str(e)), 500))
async def add_to_prio_queue(sentences, prio):
global prio_queue
# add to prio queue always if prio is one, if prio is zero add only if prio queue is not larger than 10
if prio == 1 or (prio == 0 and prio_queue.qsize() < 10):
fut=loop.create_future()
package = (prio, (sentences, fut))
prio_queue.put_nowait(package)
else:
fut.set_result((jsonify(error='Too many requests'), 429))
return await fut
@app.route('/embed', methods=['POST'])
def embed_sentences():
# Get the list of sentences from the request body
data = request.get_json(force=True)
sentences = data.get('texts', [])
prio= data.get('prio', 0)
if not sentences:
return jsonify(error='No sentences provided'), 400
result = loop.run_until_complete(add_to_prio_queue(sentences, prio))
return result
if __name__ == '__main__':
# start the embed task loop
with app.app_context():
loop.create_task(embed_task_loop())
app.run()
I have this code that contains two parts, an api that takes in sentences to be embedded (you only need to know that that is a process that takes a while) and adds them to a priority queue. High priority tasks are always processed first and low priority tasks may be rejected. The embedding threads work simultaneously and enqueuing an embedding task will return a future that can be awaited. Unfortunately, Flask and asyncio really don't like each other and so if I try to use await
instead of loop.run_until_complete
I get this error:
RuntimeError: Task <Task pending name='Task-9' coro=<AsyncToSync.main_wrap() running at /home/user/miniconda3/envs/tf/lib/python3.9/site-packages/asgiref/sync.py:353> cb=[_run_until_complete_cb() at /home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py:184]> got Future <Future pending> attached to a different loop
with loop.run_until_complete
It sort of kind of works but it regularly gives me errors like these (most likely because the threads start to overlap):
Traceback (most recent call last):
File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 2529, in wsgi_app
response = self.full_dispatch_request()
File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1825, in full_dispatch_request
rv = self.handle_user_exception(e)
File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1823, in full_dispatch_request
rv = self.dispatch_request()
File "/home/user/miniconda3/envs/tf/lib/python3.9/site-packages/flask/app.py", line 1799, in dispatch_request
return self.ensure_sync(self.view_functions[rule.endpoint])(**view_args)
File "/home/user/projects/vector-crawler/backend/src/embed.py", line 78, in embed_sentences
result = loop.run_until_complete(add_to_prio_queue(sentences, prio))
File "/home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py", line 623, in run_until_complete
self._check_running()
File "/home/user/miniconda3/envs/tf/lib/python3.9/asyncio/base_events.py", line 583, in _check_running
raise RuntimeError('This event loop is already running')
RuntimeError: This event loop is already running
It also crashes pretty hard on exit, although thats not that big of a deal So is there a clean way to handle and await promises in Flask? Or is there another way to solve this?