There is a small gotcha in the accepted answer from @davidism. The multiprocessing.Value
is accessed outside of the lock, so there is still a chance for duplicate values if you are unlucky.
Here is an example showing that collision. It also shows how this collision is possible if you are using asyncronous code (asyncio has it's own locking mechanisms).
import asyncio
import concurrent.futures
import time
from multiprocessing import Value
# Have sleep timings that could cause value collisions outside of lock context manager
TIMINGS = [(0, 0), (1, 1), (0, 2)]
counter = Value('i', 0)
def incr_counter(pre_incr_sleep, pre_return_sleep):
time.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
time.sleep(pre_return_sleep)
return counter.value
def incr_counter_context(pre_incr_sleep, pre_return_sleep):
time.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
time.sleep(pre_return_sleep)
return counter.value
async def aincr_counter(pre_incr_sleep, pre_return_sleep):
"""Return outside of the locked context (This should multi increment in some scenarios)"""
await asyncio.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
await asyncio.sleep(pre_return_sleep)
return counter.value
async def aincr_counter_context(pre_incr_sleep, pre_return_sleep):
"""Return outside of the locked context (This shouldn't multi increment in any scenario)"""
await asyncio.sleep(pre_incr_sleep)
with counter.get_lock():
counter.value += 1
await asyncio.sleep(pre_return_sleep)
return counter.value
print("*** Showing that multiprocessing.Value is multiprocess safe ***")
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = []
print("Testing concurrent returning inside of lock...")
for timings in TIMINGS:
futures.append(executor.submit(incr_counter_context, *timings))
print("Returning value inside of lock context won't cause duplicates when using non-asyncronous executor")
print([future.result() for future in futures])
futures = []
print("Testing concurrent returning outside lock...")
for timings in TIMINGS:
futures.append(executor.submit(incr_counter, *timings))
print("Returning value outside of lock context can cause duplicate values")
print([future.result() for future in futures])
loop = asyncio.get_event_loop()
print("*** Showing that multiprocessing.Value is not async safe ***")
print("Testing async returning outside of lock...")
print(loop.run_until_complete(asyncio.gather(*[aincr_counter(pre, post) for pre, post in TIMINGS])))
print("Testing async returning inside of lock...")
print(loop.run_until_complete(asyncio.gather(*[aincr_counter_context(pre, post) for pre, post in TIMINGS])))
Here is the output of the above:
*** Showing that multiprocessing.Value is multiprocess safe ***
Testing concurrent returning inside of lock...
Returning value inside of lock context won't cause duplicates when using non-asyncronous executor
[1, 3, 2]
Testing concurrent returning outside lock...
Returning value outside of lock context can cause duplicate values
[4, 6, 6]
*** Showing that multiprocessing.Value is not async safe ***
Testing async returning outside of lock...
[8, 9, 9]
Testing async returning inside of lock...
[11, 12, 12]
Luckily, you are using Flask which is synchronous, so the async problem isn't a concern for your use case.
So, I would suggest changing the accepted answer to store the lock inside the context and then release the lock ASAP. If you were to call jsonify or anything else you would keep the lock while doing operations which don't require it.
@app.route('/')
def index():
with counter.get_lock():
counter.value += 1
# save the value ASAP rather than passing to jsonify
# to keep lock time short
unique_count = counter.value
return jsonify(count=unique_count)