1

I've wrote this script here to read data from a txt file and process it. But it seems that if I give it a big file and a high number of threads, the more it reads from the list, the slower the script gets.

Is there a way to avoid waiting for all the threads to finish and start a new one whenever a thread is done with the work?

Also it seems that when it finishes processing, the script doesn't exit.

import threading, Queue, time

class Work(threading.Thread):

    def __init__(self, jobs):
        threading.Thread.__init__(self)
        self.Lock = threading.Lock()
        self.jobs = jobs

    def myFunction(self):
        #simulate work
        self.Lock.acquire()
        print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
        self.Lock.release()
        time.sleep(3)

    def run(self):
        while True:
            self.item = self.jobs.get().rstrip()
            self.firstname = self.item.split(":")[0]
            self.lastname = self.item.split(":")[1]
            self.myFunction()
            self.jobs.task_done()

def main(file):
    jobs = Queue.Queue()
    myList = open(file, "r").readlines()
    MAX_THREADS = 10
    pool = [Work(jobs) for i in range(MAX_THREADS)]
    for thread in pool:
        thread.start()
    for item in myList:
        jobs.put(item)
    for thread in pool:
        thread.join()

if __name__ == '__main__':
    main('list.txt')
jpyams
  • 4,030
  • 9
  • 41
  • 66
CarefullyAdvanced
  • 437
  • 1
  • 4
  • 15
  • You are aware of the [**dreaded GIL**](https://wiki.python.org/moin/GlobalInterpreterLock) and why your code won't be executing faster (apart from I/O operations) than running serially, right? – zwer Apr 08 '18 at 01:53
  • Just a style note, the Python convention is to name classes in UpperCamelCase (so Work) and functions in lowercase_separated_by_underscores (so main). – jpyams Apr 08 '18 at 01:54
  • @zwer I'm, it explains the first of my question, but it doesn't explain why it is gradually slowing down. – CarefullyAdvanced Apr 08 '18 at 04:01
  • @jpyams I will take that into account from now on. – CarefullyAdvanced Apr 08 '18 at 04:01

1 Answers1

2

The script probably seems to take longer on larger inputs because there's a 3 second pause between each batch of printing.

The issue with the script not finishing is, since you are using Queue, you need to call join() on the Queue, not on the individual threads. To make sure that the script returns when the jobs have stopped running, you should also set daemon = True.

The Lock will also not work in the current code because threading.Lock() produces a new lock each time. You need to have all the jobs share the same lock.

If you want to use this in Python 3 (which you should), the Queue module has been renamed to queue.

import threading, Queue, time

lock = threading.Lock()  # One lock

class Work(threading.Thread):

    def __init__(self, jobs):
        threading.Thread.__init__(self)
        self.daemon = True  # set daemon
        self.jobs = jobs

    def myFunction(self):
        #simulate work
        lock.acquire()  # All jobs share the one lock
        print("Firstname: "+ self.firstname + " Lastname: "+ self.lastname)
        self.Lock.release()
        time.sleep(3)

    def run(self):
        while True:
            self.item = self.jobs.get().rstrip()
            self.firstname = self.item.split(":")[0]
            self.lastname = self.item.split(":")[1]
            self.myFunction()
            self.jobs.task_done()


def main(file):
    jobs = Queue.Queue()
    with open(file, 'r') as fp:  # Close the file when we're done
        myList = fp.readlines()
    MAX_THREADS = 10
    pool = [Work(jobs) for i in range(MAX_THREADS)]
    for thread in pool:
        thread.start()
    for item in myList:
        jobs.put(item)
    jobs.join()    # Join the Queue


if __name__ == '__main__':
    main('list.txt')

Simpler example (based on an example from the Python docs)

import threading
import time
from Queue import Queue # Py2
# from queue import Queue # Py3

lock = threading.Lock()

def worker():
    while True:
        item = jobs.get()
        if item is None:
            break
        firstname, lastname = item.split(':')
        lock.acquire()
        print("Firstname: " + firstname + " Lastname: " + lastname)
        lock.release()
        time.sleep(3)
        jobs.task_done()

jobs = Queue()
pool = []
MAX_THREADS = 10
for i in range(MAX_THREADS):
    thread = threading.Thread(target=worker)
    thread.start()
    pool.append(thread)

with open('list.txt') as fp:
    for line in fp:
        jobs.put(line.rstrip())

# block until all tasks are done
jobs.join()

# stop workers
for i in range(MAX_THREADS):
    jobs.put(None)
for thread in pool:
    thread.join()
jpyams
  • 4,030
  • 9
  • 41
  • 66
  • Can I avoid the dreaded GIL with multiprocessing instead of threading? And if so, why does calling multiprocessing instead of threading in your code doesn't help that much in terms of speed? – CarefullyAdvanced Apr 08 '18 at 14:45
  • Based on [this answer](https://stackoverflow.com/a/3044626/5031373), it appears you can get around GIL with the `multiprocessing` module, but it comes with a bit of overhead, and since this code is doing trivial printing, the ability to run concurrent threads doesn't make much of a difference – jpyams Apr 08 '18 at 15:22