2

I may have come across a classic race-condition and need advice how to resolve this please.

VpnProfileTable = sqlalchemy.Table(
    "vpn_profile",
    metadata,
    Column("id", Integer, primary_key=True),
    Column("profile", Text(), nullable=False),
    Column("server_id", ForeignKey("server.id"), nullable=False, index=True),
    Column("device_id", ForeignKey("device.id"), nullable=True, index=True),
)

I have a VPN Profile table that can be assigned to a device at a time. A profile should never be assigned to more than one device.

  1. With the query here I ensure that only the unused profiles without a device id are returned.
  2. Then I fetch the profiles all and randomly pick one. Then I update the database table to indicate that the chosen profile is now assigned to a device and should not be given to somebody else.

However I believe between obtaining and updating the record a race condition is happening and I end up sometimes having two users getting the same profile.

async def get_next_vpn_profile(
    self, server_id: str, device_id: str
) -> Optional[str]:
    query = (
        VpnProfileTable.select()
        .where(VpnProfileTable.c.server_id == server_id)
        .where(VpnProfileTable.c.device_id == None)
    )
    async with engine.begin() as conn:
            records = (await conn.execute(query)).fetchall()
            profiles = []
            if records and len(records) > 0:
                profiles = [VpnProfile.parse_obj(i) for i in records]
            if profiles:
                profile: VpnProfile = random.choice(profiles)
                query = (
                    VpnProfileTable.update()
                    .where(VpnProfileTable.c.id == profile.id)
                    .values(device_id=device_id)
                )
                await conn.execute(query)
                return profile.profile
            else:
                return None

What can I do that this block of code can only be served to one request across all incoming requests to avoid this? (App runs on Gunicorn/Uvicorn)? Or is there a better way? I'm thinking of singleton / semaphores, but can't get my head around it.

UPDATE

As Marat has mentioned in the comments, I believe the best approach would be to use locking at the database level. I'm using Postgres, so I'm not sure if I need nowait=True inside with_for_update().

        async with engine.begin() as conn:
            query = (
                VpnProfileTable.select()
                .where(
                    VpnProfileTable.c.device_id == None,
                )
                .with_for_update()
            )
            record = (await conn.execute(query)).first()
            if record:
                query = (
                    VpnProfileTable.update()
                    .where(VpnProfileTable.c.id == record.id)
                    .values(device_id=device_id)
                )
                await conn.execute(query)
                await conn.commit()

To summarise, I believe this gets the first available vpn profile without any device_id, locks it, so that hopefully other processes wait here until the row can be read again.

Then within the same transaction I'm setting the obtained vpn profile to the given device_id and commit the changes. (Not sure if I need to commit, if there is already a with engine.begin() statement. It should happen automatically.

I can't think of a way to write unit tests for this scenario, so I hope someone can validate this. Is .with_for_update() enough to make other processes wait when they try to run the same select statement?

Because if they wait they won't get the same row, since it's already assigned to another device_id, which is exactly what I need.

Houman
  • 64,245
  • 87
  • 278
  • 460
  • Is there always a single process performing a the action? Leveraging the database concurrency, pessimistic or optimistic, might be a consideration in addition to any local exclusive code execution. – user2864740 Jul 23 '21 at 21:59
  • That is, http://docs.sqlalchemy.org/en/latest/orm/session_transaction.html (pessimistic) or http://docs.sqlalchemy.org/en/latest/orm/versioning.html (optimistic) might be relevant. – user2864740 Jul 23 '21 at 22:04
  • You should use select for update operation which locks the required rows. The answer with threads will not work, because any python app in prod runs on multiple processes. Thread locks are per process. – Marat Mkhitaryan Jul 23 '21 at 22:07
  • @user2864740 No, there are multiple workers involved. Utilising Gunicorn and Uvicorn runs the app in parallel and concurrent at the same time. Also I'm using SqlAlchemy Core, there is no session involved. – Houman Jul 23 '21 at 22:52
  • @MaratMkhitaryan your suggestion sounds really good. Do you have any good example how do to do what SqlAlchemy Core? I googled and found a solution from 2013 that is no longer working. – Houman Jul 23 '21 at 23:11
  • @Houman https://docs.sqlalchemy.org/en/14/core/selectable.html?highlight=lock#sqlalchemy.sql.expression.GenerativeSelect.with_for_update You must first lock the rows, then do the stuff you need with them. Locking and all the stuff must be happening in atomic transaction within with statement. https://docs.sqlalchemy.org/en/14/orm/session_transaction.html#begin-once – Marat Mkhitaryan Jul 24 '21 at 08:27
  • @MaratMkhitaryan Thank so much. I have now updated the question with what you have suggested. May you please check if it's correct? – Houman Jul 24 '21 at 10:06
  • @Houman Check the postgresql documentation about locks https://www.postgresql.org/docs/9.5/sql-select.html "The Locking Clause". "With NOWAIT, the statement reports an error, rather than waiting" If there will be 2 requests and the first has acquired it, the second has multiple options: to fail error or wait for lock to be unlocked. I suppose you need to wait, instead of failing an error. – Marat Mkhitaryan Jul 24 '21 at 10:31
  • @MaratMkhitaryan Perfect. In this case I should use `nowait=False` to force it to wait. Other than that the code I entered in the Update section seems correct to you? Thanks – Houman Jul 24 '21 at 11:16
  • Marat, there is another great option: ```With SKIP LOCKED, any selected rows that cannot be immediately locked are skipped. Skipping locked rows ... can be used to avoid lock contention with multiple consumers accessing a queue-like table.``` – Houman Jul 24 '21 at 11:25
  • 1
    @Houman I think you should not skip locked, just wait for lock. The code looks ok and does the locking. – Marat Mkhitaryan Jul 24 '21 at 12:06

1 Answers1

2

Wrap your code in a mutex lock. In Python this can be done using multiprocessing.Lock; for example:

from multiprocessing import Lock
mutex = Lock()

async def get_next_vpn_profile(
    self, server_id: str, device_id: str
) -> Optional[str]:
    ...

    with mutex:
        async with engine.begin() as conn:
        ...

Typically mutex locks don't work if multiple processes are used, however this answer clarifies that Gunicorn workers can share a lock when using the Lock from multiprocessing.

KernelDeimos
  • 503
  • 4
  • 12
  • 1
    Better to release in a `finally:` block so it gets released even when an exception is thrown. – Charles Duffy Jul 23 '21 at 21:58
  • @CharlesDuffy good point, updated my answer – KernelDeimos Jul 23 '21 at 22:01
  • 1
    This is only sufficient if the access is guarded within a single process. Even if that is the currently the case, it’s worth calling out. – user2864740 Jul 23 '21 at 22:02
  • Locks are also context managers, so you can do `with mutex: ...` to simplify this. – kaya3 Jul 23 '21 at 22:08
  • 1
    It looks like that indeed might be the case: https://stackoverflow.com/questions/18213619 I'll edit my answer to use `multiprocessing.Lock` instead, but perhaps this question should be marked as a duplicate. – KernelDeimos Jul 23 '21 at 22:08
  • You must use asyncio.Lock. Not threading one. You must use with statement. try except makes everything hard to read. – Marat Mkhitaryan Jul 23 '21 at 22:10