0

I have a small peculiar task at hand and I couldn't figure out how to best implement a solution.

I have three workstations that are connected to a NAS running Ubuntu 20.04 LTS, via InfiniBand with 40gbps of bandwidth. This NAS is equipped with a 2TB NVMe SSD as write cache, and 7 RAID0 units as the main storage.

These workstations will spit out raw data to this NAS for later use, each of these machines will spit out somewhere around 6TB of data files each day, sizing from 100 - 300 GB each file. In order to prevent the network gets too crowded, I have them output the data to the NVMe cache first, then I plan to distribute the data files from there - exactly one file to each of the RAID0 units concurrently to maximize disk IO. For example, file1 goes to array0, file2 goes to array1, file3 goes to array2, and so on.

Now I am writing a script on the NAS side (preferably as a systemd service, but I can do with nohup) to monitor the cache, and send the file to these RAID arrays.

Here's what I come up with, and it is very close to my goal thanks to this post.

import queue, threading, os, time
import shutil

bfr_drive = '/home/test_folder' # cache
ext = ".dat" # data file extension
array = 0 # simluated array as t0-t6
fileList = [] # list of files to be moved from cache to storage
destPath = '/home/test_folder/t'
fileQueue = queue.Queue()


class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    array = 0
    lock = threading.Lock()

    def __init__(self):
        for file_name in os.listdir(bfr_drive):
            if file_name.endswith(ext):
                fileList.append(os.path.join(bfr_drive, file_name))
                fileList.sort()

        self.totalFiles = len(fileList)

        print (str(self.totalFiles) + " files to copy.")
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        global array
        while True:
            fileName = fileQueue.get()
            shutil.copy(fileName, destPath+str(array))
            array += 1
            if array > 6:
                array = 0
            fileQueue.task_done()

            with self.lock:
                self.copyCount += 1
                
                percent = (self.copyCount * 100) / self.totalFiles
                
                print (str(percent) + " percent copied.")

    def threadWorkerCopy(self, fileNameList):
        # global array
        for i in range(4):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()
            # array += 1
            # if array > 6:
                # array = 0
            print ("current array is:" + str(array)) # output prints array0 for 4 times, did not iterate
            
          
        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()

Now, the Python script can successfully distribute files, but only after what's after the number in for i in range(4). For example, if I set it to 4, then the workers will use the same path (array0) for the first 4 files, only then they will start iterating through the arrays to 1, 2, 3, etc.

Would someone please point out how can I distribute the files? I think I am heading in the right direction, however, I just can't wrap my head around why those workers are stuck with the same directory at the start.

EDIT: I had the earlier version of code with the path iteration is at the spawning process threadWorkerCopy. I now had it moved to the actual worker function which is CopyWorker. The problem still stands true.

  • Do I understand right that you want to copy the first file to `t0`, the 2nd file to `t1`, the 3rd file to `t2` etc, the 7th file to `t6` 8th file to `t0` and so on? Please [edit] your question to clarify this. – Bodo May 17 '21 at 18:05
  • When I test your code, the output of `print ("current array is:" + str(array))` iterates from 1 to 4, but all `CopyWorker` threads will then use the value 4 and *not* start iterating later. – Bodo May 17 '21 at 18:10
  • @Bodo Yes. I would like to have the file1 to array0, file2 to array1, file3 to array2, etc etc. – Michael Hwung May 17 '21 at 18:23
  • 1
    **Please [edit] your question** to add this clarification. All information about your problem should be in the question, not in comments. – Bodo May 17 '21 at 18:26
  • @Bodo That makes it even more strange. I thought what was happening was the iterated path isn't passed to the `CopyWorker`. But even the process of spawning the worker itself is an iteration loop. Just really, really strange. – Michael Hwung May 17 '21 at 18:27
  • It is difficult to test your program because it requires input files and output directories. You should create a [mre] that we can easily run to reproduce your problem. – Bodo May 17 '21 at 18:53

1 Answers1

1

The problem is that you don't generate new values of array in the worker threads but only when creating the threads in threadWorkerCopy.
The result will depend on the actual timing on your system. Every worker thread will use the value of array at the time when it reads the value. This may be concurrent to threadWorkerCopy incrementing the value or afterwards, so you may get files in different directories or all in the same directory.

To get a new number for every copying process, the number in array must be incremented in the worker threads. In this case you have to prevent concurrent access to array by two or more threads at the same time. You can implement this with another lock.

For testing I replaced the directory listing with a hard-coded list of example file names and replaced the copying with printing the values.

import queue, threading, os, time
import shutil

bfr_drive = '/home/test_folder' # cache
ext = ".dat" # data file extension
array = 0 # simluated array as t0-t6
fileList = [] # list of files to be moved from cache to storage
destPath = '/home/test_folder/t'
fileQueue = queue.Queue()


class ThreadedCopy:
    totalFiles = 0
    copyCount = 0
    array = 0
    lock = threading.Lock()
    lockArray = threading.Lock()

    def __init__(self):
        # directory listing replaced with hard-coded list for testing
        for file_name in [ 'foo.dat', 'bar.dat', 'baz.dat', 'a.dat', 'b.dat', 'c.dat', 'd.dat', 'e.dat', 'f.dat', 'g.dat' ] :
        #for file_name in os.listdir(bfr_drive):
            if file_name.endswith(ext):
                fileList.append(os.path.join(bfr_drive, file_name))
                fileList.sort()

        self.totalFiles = len(fileList)

        print (str(self.totalFiles) + " files to copy.")
        self.threadWorkerCopy(fileList)


    def CopyWorker(self):
        global array
        while True:
            fileName = fileQueue.get()

            with self.lockArray:
                myArray = array
                array += 1
                if array > 6:
                    array = 0

            # actual copying replaced with output for testing
            print('copying', fileName, destPath+str(myArray))
            #shutil.copy(fileName, destPath+str(myArray))

            with self.lock:
                self.copyCount += 1

                percent = (self.copyCount * 100) / self.totalFiles

                print (str(percent) + " percent copied.")

            # moved to end because otherwise main thread may terminate before the workers
            fileQueue.task_done()

    def threadWorkerCopy(self, fileNameList):
        for i in range(4):
            t = threading.Thread(target=self.CopyWorker)
            t.daemon = True
            t.start()

        for fileName in fileNameList:
            fileQueue.put(fileName)
        fileQueue.join()

ThreadedCopy()

This prints something like this (may change between different runs):

10 files to copy.
copying /home/test_folder\a.dat /home/test_folder/t0
10.0 percent copied.
copying /home/test_folder\baz.dat /home/test_folder/t3
20.0 percent copied.
copying /home/test_folder\b.dat /home/test_folder/t1
copying /home/test_folder\c.dat /home/test_folder/t4
copying /home/test_folder\bar.dat /home/test_folder/t2
copying /home/test_folder\d.dat /home/test_folder/t5
30.0 percent copied.
copying /home/test_folder\e.dat /home/test_folder/t6
40.0 percent copied.
copying /home/test_folder\f.dat /home/test_folder/t0
50.0 percent copied.
copying /home/test_folder\foo.dat /home/test_folder/t1
60.0 percent copied.
copying /home/test_folder\g.dat /home/test_folder/t2
70.0 percent copied.
80.0 percent copied.
90.0 percent copied.
100.0 percent copied.

Notes:

I moved the line fileQueue.task_done() to the end of CopyWorker. Otherwise I don't get all percentage output lines and sometimes an error message

Fatal Python error: could not acquire lock for <_io.BufferedWriter name='<stdout>'> at interpreter shutdown, possibly due to daemon threads

Maybe you should wait for the end of all worker threads before the end of the main thread.

I did not check if there are any further errors in the code.


Edit after the code in the question has been changed:

The modified code still contains the problem that the worker threads will still do some output after fileQueue.task_done() so that the main thread may end before the workers.

The modified code contains race conditions when the worker threads access array, so the behavior may be unexpected.

Bodo
  • 9,287
  • 1
  • 13
  • 29
  • Thank you very much! Your explanation is very thorough and easy to understand. I have never tried to code in a multithreading fashion, this is my first time trying. Can't do it without your help. – Michael Hwung May 17 '21 at 18:55