0

I'm developing an application similar to Omegle, where users are matched with strangers based on their common interests. To achieve this, I'm combining the LSH (Locality Sensitive Hashing) algorithm with the Minhash technique. However, I'm facing difficulties in implementing a waiting mechanism for users who don't immediately find a matching pair when they call the API.

Currently, I'm using the sleep function to introduce a waiting period before returning the status "Failed". However, it seems that the sleep function is blocking other API calls and causing delays for other users. I'm curious to know how websites like Omegle handle this scenario and what would be the correct procedure to implement an efficient waiting mechanism.

Here's code snippet:

from fastapi import FastAPI, Body
from typing import Annotated
from pydantic import BaseModel
from sonyflake import SonyFlake
import redis
import time
from datasketch import MinHash, MinHashLSH

app = FastAPI()
sf = SonyFlake()
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
    'type': 'redis',
    'redis': {'host': '127.0.0.1', 'port': 6379}
}, prepickle=True)


class Partner(BaseModel):
    client_id: int
    partner_id: str
    status: str = 'Failed'


@app.post("/start", response_model=Partner)
async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
    client_id=sf.next_id()
    partner_id = ''

    minhash = MinHash()
    if not interests:
        return Partner(client_id = client_id, partner_id = partner_id)

    client_hash = f"user:{client_id}:interests:hash"
    minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
    lsh.insert(client_hash, minhash)

    matches = lsh.query(minhash)
    matches.remove(client_hash)

    if not matches:
        time.sleep(5)

    matches = lsh.query(minhash)
    matches.remove(client_hash)

    if not matches:
        lsh.remove(client_hash)
        return Partner(client_id = client_id, partner_id = partner_id)

    lsh.remove(client_hash)
    lsh.remove(matches[0])
    return Partner(client_id = client_id, partner_id = matches[0], status="Success")

I would appreciate any insights or suggestions on how to properly implement the waiting mechanism, ensuring that it doesn't negatively impact the performance and responsiveness of the application. Is there a recommended approach or best practice to achieve this functionality while maintaining the responsiveness of the API for other users?

  • Please share any insights or best practices on implementing an efficient waiting mechanism in this scenario.
  • Any suggestions on optimizing the code or improving its responsiveness would be greatly appreciated.
  • Please provide resources or links to read more about it.

Thank you.

Abhijith Ea
  • 115
  • 11
  • 4
    If you really want to use sleep, use `await asyncio.sleep()` instead - that way your running coroutine will give up its processing time while waiting at least; a different solution would be to just make the client ask again every x seconds instead of sleeping - it'll make the implementation simpler on the server side. Or you can use a websocket or similar pubsub-ish "subscriptions" - subscribe to a given hash, then receive an event when another identical hash appears. – MatsLindh May 24 '23 at 12:45
  • @MatsLindh Thanks for the suggestion, I will look into pub/sub. Making the client ask every 5 seconds instead of sleeping would make the problem disappear but I can't go that path. I'm trying to replicate the omegle site so i have to do how they doing. – Abhijith Ea May 25 '23 at 04:35
  • Waiting five seconds in process and waiting five seconds to make a new request are the same thing from the user - it's just a five second delay in either case; in the latter case you just won't tie up any resources on the server side by waiting. – MatsLindh May 25 '23 at 07:50
  • @MatsLindh I am thinking about doing it with pub/sub. Do you have any resources or anything so i can look into it. google search didn't get me anywhere. I appreciate your help. Thank you – Abhijith Ea May 26 '23 at 07:20
  • 1
    You might want to look at something like https://pypi.org/project/fastapi-websocket-pubsub/ and possibly using something like redis to handle out-of-web-context workers etc. – MatsLindh May 26 '23 at 08:02

1 Answers1

1

From what I can tell, it seems like you're implementing a synchronous waiting mechanism, which is blocking the entire process. You should look into using a form of polling or asynch. This can be done through a WebSocket or through HTTP long-polling. I think WebSockets are better for a few reasons, but mostly bi-diretional comms and keep-alive conns. I have tried to implement for that for you below:

from fastapi import FastAPI, WebSocket, Body, Depends
from typing import Annotated
from pydantic import BaseModel
from sonyflake import SonyFlake
import redis
from datasketch import MinHash, MinHashLSH

app = FastAPI()
sf = SonyFlake()
r = redis.Redis(host='localhost', port=6379, decode_responses=True)
lsh = MinHashLSH(num_perm=128, threshold=0.5, storage_config={
    'type': 'redis',
    'redis': {'host': '127.0.0.1', 'port': 6379}
}, prepickle=True)

class Partner(BaseModel):
    client_id: int
    partner_id: str
    status: str = 'Failed'

@app.post("/start", response_model=Partner)
async def start(interests: Annotated[list[str] | None, Body()] = None) -> Partner:
    client_id=sf.next_id()
    partner_id = ''

    minhash = MinHash()
    if not interests:
        return Partner(client_id = client_id, partner_id = partner_id)

    client_hash = f"user:{client_id}:interests:hash"
    minhash.update_batch([*(map(lambda item: item.encode('utf-8'), interests))])
    lsh.insert(client_hash, minhash)

    return Partner(client_id = client_id, partner_id = partner_id)

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await websocket.accept()
    while True:
        client_hash = f"user:{client_id}:interests:hash"
        minhash = lsh.get(client_hash)
        
        if minhash is None:
            await websocket.send_json({"status": "Error", "message": "Client ID not found"})
            return

        matches = lsh.query(minhash)
        matches.remove(client_hash)

        if not matches:
            await websocket.send_json({"status": "Waiting"})
        else:
            lsh.remove(client_hash)
            lsh.remove(matches[0])
            await websocket.send_json({"status": "Success", "client_id": client_id, "partner_id": matches[0]})
            return
        await asyncio.sleep(5)
artemis
  • 6,857
  • 11
  • 46
  • 99