0

Given the following Python3 code, with threading:

class main:
    def __init__(self):
        self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")
        self.hashlist = queue.Queue()
        self.filelist = queue.Queue()
        self.top = '/home/'
        for y in range(12):
            self.u = threading.Thread(target=self.md5hash)
            self.u.daemon = True
            self.u.start()
        for x in range(4):
            self.t = threading.Thread(target=self.threader)
            self.t.daemon = True
            self.t.start()
        main.body(self)

    def body(self):
        start = time.time()
        self.text.write("Time now is " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
        for root, dirs, files in os.walk(self.top):
            for f in files:
                path = os.path.join(root,f)
                self.filelist.put(path)
        self.t.join()
        self.u.join()
        self.text.write("Total time taken     : " + str(time.time() - start) + "\n")
        print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")

    def md5hash(self):
        while True:
            entry = self.filelist.get()
            //hashing//
            lists = finalhash + ',' + entry
            self.hashlist.put(lists)
            self.filelist.task_done()

    def compare(self, hashed, path):
        f = open(os.getcwd() + "/database.csv", 'r')
        for row in f:
            if row.split(':')[1] == hashed:
                print("Suspicious File!")
                print("Suspecfs: " + row.split(':')[2] + "File name : " + path)

    def threader(self):
        while True:
            item = self.hashlist.get()
            hashes = item.split(',')[0]
            path = item.split(',')[1]
            self.compare(hashes, path)
            self.hashlist.task_done()

main()

Problem 1: In def body(self), there exist the line self.text.write("Time now is ..."). This line does not appear in the Log File that is created.

Problem 2: In def compare(self, hashed, path), there exist a line that prints "Suspicious File!" and the file path every time there is a hash collision. This line always prints out of order as the 4 threads t are fighting over who will print first. For this, I think I need to know how to let Python threads run print commands sequentially instead of however they like -- how?

Problem 3: In def body(self), there exist lines self.u.join() and self.t.join(). The command join(), according to the best of my knowledge, is a command waiting for the thread to terminate before continuing. The threads are both not terminating.

Additional information 1: I am writing multi-threading as I need the code to be converted to multi-processing later.

Additional information 2: Please do let me know if I have misunderstood any commands/syntax in my code as you are skimming through.

zwer
  • 24,943
  • 3
  • 48
  • 66
Timothy Wong
  • 689
  • 3
  • 9
  • 28
  • 2
    Your code seems to be misformatted - `body`, `md5hash`, `compare` and `threader` are defined outside `main`, ie. as function rather than methods. Is it a copying mistake or is it as your code really looks? – Błotosmętek Jul 08 '17 at 10:39
  • Sorry it is a formatting mistake – Timothy Wong Jul 08 '17 at 11:48
  • 1
    Re 1: you don't close your log nor flush it. This is probably the reason why line written to it never appears in the file. Try adding `self.text.flush()` after write. – Błotosmętek Jul 08 '17 at 11:56
  • 1
    Whether threading or multiprocessing, managing printing to one file from multiple points generally calls for a print queue (a pipe or queue) and one process/thread manages the printing. I would recommend jumping to multi-process sooner rather than later, because there will be similar (but different) issues that come up. And rather than solving problems twice, do it once. – Ron Norris Jul 08 '17 at 12:00

1 Answers1

2

Problem 1: You're writing to your file buffer - it gets flushed to the actual file only when the buffer is full, the file handle is closed or you explicitly call flush() on it (i.e. self.text.flush())

Problem 2: You either want your code to execute in parallel (and it's not, but we'll get to that) but you lose the order of execution, or you execute serially keeping the order. If you want to run multiple threads it makes little sense to have them execute one after another because then you're not executing your code in parallel and you might as well execute everything in the main thread.

If you only want to control the output to STDOUT, provided it doesn't interfere with the thread execution, you can capture what you want to print and have it printed out in the end under a mutex (so only one thread writes at the time) or even pipe it back to the main thread and have it manage access to STDOUT. A simple mutex example would be:

PRINT_MUTEX = threading.Lock()

def compare(self, hashed, path):  # never mind the inefficiency, we'll get to that later
    out = []  # hold our output buffer
    with open(os.getcwd() + "/database.csv", 'r') as f:
        for row in f:
            row = row.split(':')
            if row[1] == hashed:
                out.append("Suspicious File!")
                out.append("Suspecfs: " + row[2] + "File name : " + path)
    if out:
        with self.PRINT_MUTEX:  # use a lock to print out the results
            print("\n".join(out))

This will not keep in order the thread execution (nor should you attempt to least you want to defeat the purpose of 'parallel' execution) but at least the threads will output their compare results one at a time instead of interspersing their results. If you want to have your main thread/process control the STDOUT, especially since you want to transform this into a multi-processing code, check this answer.

Problem 3: Your threads are never exiting because they are stuck in a while True loop - until you break away from it the threads will keep running. I don't know what's the reason behind the way you've structured the code, but if you're trying to parallelize file listing (main thread), reading, hashing (md5hash threads) and comparing (threader threads) you presumably want to stop hashing when there are no more files, and stop comparing when there are no more hashes. To do so, you cannot really use Queue.task_done() as it's there to signal other 'listeners' (if they are blocked by a Queue.join() call which yours are not) that you're done with queue modifications.

You should use a threading.Event signal for that, but if you want to keep it queue.Queue only you can create a special property to denote the end of your queue and then place it in a queue when there's nothing more to process, then have your threads exit their loops when they encounter this special property. Let's first fix a big overlook in your code - you're not storing a reference to your threads at all, you're overwriting it with the last thread so you cannot really control the execution flow - instead of storing the last thread reference in a variable, store all the references in a list. Also, if you're going to wait for everything to close don't use daemon threads:

def __init__(self):
    self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+")  # consider os.path.join()
    self.hashlist = queue.Queue()
    self.filelist = queue.Queue()
    self.hashers = []  # hold the md5hash thread references
    self.comparators = []  # hold the threader thread references
    self.top = '/home/'
    for _ in range(12):  # you might want to consider a ThreadPool instead
        t = threading.Thread(target=self.md5hash)
        t.start()
        self.hashers.append(t)
    for _ in range(4):
        t = threading.Thread(target=self.threader)
        t.start()
        self.comparators.append(t)
    main.body(self)

Now we can modify the main.body() method so that it adds the aforementioned special values to the end of our queues so that the worker threads know when to stop:

QUEUE_CLOSE = object()  # a 'special' object to denote end-of-data in our queues

def body(self):
    start = time.time()
    self.text.write("Time:  " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n")
    for root, dirs, files in os.walk(self.top):
        for f in files:
            path = os.path.join(root, f)
            self.filelist.put(path)
    self.filelist.put(self.QUEUE_CLOSE)  # no more files, signal the end of the filelist
    for t in self.hashers:  # let's first wait for our hashing threads to exit
        t.join()
    # since we're not going to be receiving any new hashes, we can...
    self.hashlist.put(self.QUEUE_CLOSE)  # ... signal the end of the hashlist as well
    for t in self.comparators:  # let's wait for our comparator threads to exit
        t.join()
    self.text.write("Total: " + str(time.time() - start) + "\n")
    self.text.close()  # close the log file (this will also flush the previous content)
    print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt")

And consequently we need to modify the worker threads to exit when they encounter the end of the queue:

def md5hash(self):
    while self.filelist:
        entry = self.filelist.get()
        if entry is self.QUEUE_CLOSE:  # end of queue encountered
            self.filelist.put(self.QUEUE_CLOSE)  # put it back for the other threads
            break  # break away the processing
        finalhash = whatever_is_your_hash_code(entry)
        lists = finalhash + ',' + entry
        self.hashlist.put(lists)

def threader(self):
    while True:
        item = self.hashlist.get()
        if item is self.QUEUE_CLOSE:  # end of queue encountered
            self.hashlist.put(self.QUEUE_CLOSE)  # put it back for the other threads
            break  # break away the queue
        hashes = item.split(',')[0]
        path = item.split(',')[1]
        self.compare(hashes, path)

Now if you run it, provided your unlisted hashing part works properly, everything should eventually exit.

Apart from the awkward setup, one thing you definitely should do is optimize away the main.compare() method - since the CSV file doesn't change during the execution (and if it does you should handle it in-memory) loading the whole CSV and looping through it for every files' hash you want to compare is ludicrous. Load the whole CSV as hash<=>whatever dict and then do comparisons on the spot (i.e. if hashed in your_map) instead.

And lastly, as I mentioned above, time to, hmm, rain on your parade - all of this was for nothing! Due to the dreaded GIL none of your threads here execute in parallel (actually, only the file loading does to an extent, but any advantage is probably thwarted by the time it takes to hash the data). They do run as a separate, honest-to-god system threads but the GIL ensures that only one of those threads is running at a time so this code, processing wise, is most likely slower than if you were running everything in one thread. This won't help you much in the process of multiprocessing either because you cannot share a local instance state (well, you can, check this answer, but it's just a major PITA and most of the time not worth going through the trouble).

zwer
  • 24,943
  • 3
  • 48
  • 66