I have a small peculiar task at hand and I couldn't figure out how to best implement a solution.
I have three workstations that are connected to a NAS running Ubuntu 20.04 LTS, via InfiniBand with 40gbps of bandwidth. This NAS is equipped with a 2TB NVMe SSD as write cache, and 7 RAID0 units as the main storage.
These workstations will spit out raw data to this NAS for later use, each of these machines will spit out somewhere around 6TB of data files each day, sizing from 100 - 300 GB each file. In order to prevent the network gets too crowded, I have them output the data to the NVMe cache first, then I plan to distribute the data files from there - exactly one file to each of the RAID0 units concurrently to maximize disk IO. For example, file1 goes to array0, file2 goes to array1, file3 goes to array2, and so on.
Now I am writing a script on the NAS side (preferably as a systemd
service, but I can do with nohup
) to monitor the cache, and send the file to these RAID arrays.
Here's what I come up with, and it is very close to my goal thanks to this post.
import queue, threading, os, time
import shutil
bfr_drive = '/home/test_folder' # cache
ext = ".dat" # data file extension
array = 0 # simluated array as t0-t6
fileList = [] # list of files to be moved from cache to storage
destPath = '/home/test_folder/t'
fileQueue = queue.Queue()
class ThreadedCopy:
totalFiles = 0
copyCount = 0
array = 0
lock = threading.Lock()
def __init__(self):
for file_name in os.listdir(bfr_drive):
if file_name.endswith(ext):
fileList.append(os.path.join(bfr_drive, file_name))
fileList.sort()
self.totalFiles = len(fileList)
print (str(self.totalFiles) + " files to copy.")
self.threadWorkerCopy(fileList)
def CopyWorker(self):
global array
while True:
fileName = fileQueue.get()
shutil.copy(fileName, destPath+str(array))
array += 1
if array > 6:
array = 0
fileQueue.task_done()
with self.lock:
self.copyCount += 1
percent = (self.copyCount * 100) / self.totalFiles
print (str(percent) + " percent copied.")
def threadWorkerCopy(self, fileNameList):
# global array
for i in range(4):
t = threading.Thread(target=self.CopyWorker)
t.daemon = True
t.start()
# array += 1
# if array > 6:
# array = 0
print ("current array is:" + str(array)) # output prints array0 for 4 times, did not iterate
for fileName in fileNameList:
fileQueue.put(fileName)
fileQueue.join()
ThreadedCopy()
Now, the Python script can successfully distribute files, but only after what's after the number in for i in range(4)
. For example, if I set it to 4, then the workers will use the same path (array0) for the first 4 files, only then they will start iterating through the arrays to 1, 2, 3, etc.
Would someone please point out how can I distribute the files? I think I am heading in the right direction, however, I just can't wrap my head around why those workers are stuck with the same directory at the start.
EDIT: I had the earlier version of code with the path iteration is at the spawning process threadWorkerCopy
. I now had it moved to the actual worker function which is CopyWorker
. The problem still stands true.