1

I have some legacy code that needs to access the same data file across multiple threads and processes. I am trying to implement locking to protect the data.

Multithreaded

import contextlib
import threading

FILE_PATH = "foo.txt"

USE_GLOBAL_LOCK = False

if USE_GLOBAL_LOCK:
    global_lock = threading.Lock()
else:
    global_lock = contextlib.nullcontext()

def do_write_then_read(results) -> None:
    # Write to disk
    data = "FOO"
    with global_lock:
        with open(FILE_PATH, "w") as f:
            f.write(data)

    # Read from disk
    data = None
    with global_lock:
        with open(FILE_PATH, "r") as f:
            data = f.read()
    results.append(data)

def run_multithreaded() -> None:
    results = []

    threads = []
    for _ in range(10):
        threads.append(threading.Thread(target=do_write_then_read, args=[results]))
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print(results)

The output is usually correct:

 ['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']

but sometimes there is missing data. I believe this is due to a race condition between the read() and the write(), which initially truncates the file:

 ['', '', 'FOO', '', '', 'FOO', '', 'FOO', 'FOO', 'FOO']`

Setting USE_GLOBAL_LOCK to protect the file access using a threading.Lock indeed fixes the problem.

Multithreaded and multiprocess

However, running this across multiple processes again results in missing data.

Here's some test code that forks subprocesses that each invoke the above run_multithreaded() method.

import fcntl
import subprocess

def run_multiprocess() -> None:
    processes = []
    for _ in range(3):
        CMD = "python3 -c 'import foo; foo.run_multithreaded()'"
        processes.append(subprocess.Popen(CMD, shell=True))
    for p in processes:
        p.wait()

Output with missing data:

['', '', 'FOO', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['', '', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']

Here we redefine do_write_then_read() to add filesystem-based locks (flock) so that the file can be locked across multiple processes:

def do_write_then_read(results) -> None:
    # write
    data = "FOO"
    with global_lock:
        with open(FILE_PATH, "w") as f:
            # Acquire file lock
            fd = f.fileno()
            fcntl.flock(fd, fcntl.LOCK_EX)

            f.write(data)
            # Exiting the context closes file and releases lock

    # read
    data = None
    with global_lock:
        with open(FILE_PATH, "r") as f:
            # Acquire file lock
            fd = f.fileno()
            fcntl.flock(fd, fcntl.LOCK_EX)

            data = f.read()
            # Exiting the context closes file and releases lock
    results.append(data)

However, this doesn't fix the problem, and I can't figure out why, no matter what I try :P

I'm on Mac / Linux with Python 3.9.

jrc
  • 20,354
  • 10
  • 69
  • 64
  • 2
    (edited) I did not try, just briefly looked at the question and the code, but `with open(FILE_PATH, "w")` truncates the file, and in your last code fragment the lock is acquired afterward. I think you should flock first which is a problem, because the file would need to be created already locked. It looks like the same file cannot be used for both data storage and locking. But as I wrote, I did not test anything. – VPfB May 26 '22 at 12:46
  • 1
    Maybe you can create the file atomically. Write the data to a temp file and rename it when finished writing without error. Depends on the use case. – VPfB May 27 '22 at 05:19
  • Bingo! This is gold. (Unfortunately you won't get any points unless you post an answer. If it's not a biggie for you, I will accept @Homer512's answer which builds upon your insight.) – jrc May 27 '22 at 11:24
  • I was tired and didn't feel like writing a precise answer. Homer512 gave me credit for the comment, the rest is his work. It's perfectly fine to accept his answer. – VPfB May 27 '22 at 12:12

2 Answers2

1

As VPfB noted, you have to postpone the truncation until after you acquire the lock. You can just replace threading.Lock with multiprocessing.Lock. Depending on how the processes get spawned, sharing a lock may be more or less easy.

Alternatively, you could use a separate lock file.

with open(FILE_PATH + ".lock", "w") as lockfile:
    fcntl.flock(lockfile.fileno(), fcntl.LOCK_EX)
    with open(FILE_PATH, "w") as f:
        f.write(data)

with open(FILE_PATH + ".lock", "w") as lockfile:
    fcntl.flock(lockfile.fileno(), fcntl.LOCK_SH)
    with open(FILE_PATH, "r") as f:
        data = f.read()

There is no good way to remove the lock file unless you know all processes have left this code section. We could come up with more complicated schemes but at that point switching to the multiprocessing module is probably easier.

Homer512
  • 9,144
  • 2
  • 8
  • 25
  • 1
    Thanks for expanding my comment; the "a" (append) mode will create the file if it does not exist (not good). To create a locked file is very probably not possible https://stackoverflow.com/questions/17352655/how-do-i-atomically-create-a-locked-file-in-linux – VPfB May 26 '22 at 17:04
  • @VPfB I think creating a file if it doesn't exist was the intention. But it is hard to tell seeing how the use case is not clear. If creating is not the desired, changing the mode to "r+" would do the trick – Homer512 May 26 '22 at 19:35
  • 2
    The problem is that the created file is not locked and might get read by another thread. `r+` won't help, because it fails in the opposite case when the file doesn't exist. But as you wrote, the use case is not that clear. – VPfB May 27 '22 at 05:17
  • @VPfB You are right. Didn't see that race condition. Changing my answer now – Homer512 May 27 '22 at 06:52
  • Right, create if not exist was the intention, but it's possible for me to create it up-front or use a second lock file. Unfortunately I don't think I can use multiprocessing because some of the Python scripts are run as cron jobs. – jrc May 27 '22 at 12:17
0

The problem is that the global locks are shared in multiprocessing in all threads (i.e. the threads will literally access the same lock across the multiple threads), but you have to explicitly pass them to the processes in multiprocessing, as you'll end up with copies here that are otherwise not connected and live in different process spaces.

See this example from the docs and for more information on that issue.

Bastian Venthur
  • 12,515
  • 5
  • 44
  • 78