I may have come across a classic race-condition and need advice how to resolve this please.
VpnProfileTable = sqlalchemy.Table(
"vpn_profile",
metadata,
Column("id", Integer, primary_key=True),
Column("profile", Text(), nullable=False),
Column("server_id", ForeignKey("server.id"), nullable=False, index=True),
Column("device_id", ForeignKey("device.id"), nullable=True, index=True),
)
I have a VPN Profile table that can be assigned to a device at a time. A profile should never be assigned to more than one device.
- With the query here I ensure that only the unused profiles without a device id are returned.
- Then I fetch the profiles all and randomly pick one. Then I update the database table to indicate that the chosen profile is now assigned to a device and should not be given to somebody else.
However I believe between obtaining and updating the record a race condition is happening and I end up sometimes having two users getting the same profile.
async def get_next_vpn_profile(
self, server_id: str, device_id: str
) -> Optional[str]:
query = (
VpnProfileTable.select()
.where(VpnProfileTable.c.server_id == server_id)
.where(VpnProfileTable.c.device_id == None)
)
async with engine.begin() as conn:
records = (await conn.execute(query)).fetchall()
profiles = []
if records and len(records) > 0:
profiles = [VpnProfile.parse_obj(i) for i in records]
if profiles:
profile: VpnProfile = random.choice(profiles)
query = (
VpnProfileTable.update()
.where(VpnProfileTable.c.id == profile.id)
.values(device_id=device_id)
)
await conn.execute(query)
return profile.profile
else:
return None
What can I do that this block of code can only be served to one request across all incoming requests to avoid this? (App runs on Gunicorn/Uvicorn)? Or is there a better way? I'm thinking of singleton / semaphores, but can't get my head around it.
UPDATE
As Marat has mentioned in the comments, I believe the best approach would be to use locking at the database level. I'm using Postgres, so I'm not sure if I need nowait=True
inside with_for_update()
.
async with engine.begin() as conn:
query = (
VpnProfileTable.select()
.where(
VpnProfileTable.c.device_id == None,
)
.with_for_update()
)
record = (await conn.execute(query)).first()
if record:
query = (
VpnProfileTable.update()
.where(VpnProfileTable.c.id == record.id)
.values(device_id=device_id)
)
await conn.execute(query)
await conn.commit()
To summarise, I believe this gets the first available vpn profile without any device_id, locks it, so that hopefully other processes wait here until the row can be read again.
Then within the same transaction I'm setting the obtained vpn profile to the given device_id and commit the changes. (Not sure if I need to commit, if there is already a with engine.begin()
statement. It should happen automatically.
I can't think of a way to write unit tests for this scenario, so I hope someone can validate this. Is .with_for_update()
enough to make other processes wait when they try to run the same select statement?
Because if they wait they won't get the same row, since it's already assigned to another device_id, which is exactly what I need.