-1

I want to use a limited number of threads (at most 2) to run a function in a class for removing some files on disk in the background. The rest of my code within the same class is independent of this background function and might get executed tens of times more than the background function. However, I still need to enforce the core/thread limit. So it is possible that the background jobs might exceed 2 and I need to queue them. Note that my background function does not take any arguments.

I am pretty new to multi-threading and multi-processing but I think I have done my homework and looked at many posts here on Stack Overflow and tried a couple of approaches. However, none of those approaches seems to work for me. Here's the structure of my code:

class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        self.backgroundFunc() #I want to run this in the background

Here's how I run the code

import myClass

myClassInstance = myClass()
For element in someList:
    myClassInstance.mainFunc(elem=element)

Note that I cannot start the background job before the stuff in mainFunc has started running.

And here is my first try with threading in my class file:

from threading import Thread
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        thr = Thread(target=self.backgroundFunc)
        thr.start()

However, the problem with this approach is that the program crashes at random times; sometimes right at the beginning of prpgram execution and sometimes later the erro messages are also different every time. I guess it's possibly because threads do not block a piece of memory and things might be being written/read from those memory cells. Or, unlikely, maybe this is because I am running my code on a server and there are some limitations enforced from the server on the allocated resources. In addition, I cannot set a limit on the number of threads and cannot do queuing, in case mainFunc code gets executed more than twice while I already have two background jobs running.

Here's another try with multiprocessing.Process:

from multiprocessing import Process
class myClass(object):
    def __init__(self):
        #some stuff
    def backgroundFunc(self):
        # delete some files on disk
    def mainFunc(self, elem):
        # Do some other things
        p = Process(target=self.backgroundFunc)
        p.start()

The problem with this approach is that Process will use as many threads/cores that my machine has in its disposal and since the rest of my code automatically is run in parallel, everything becomes super slow very quickly.

I eventually arrived at multiprocessing.Pool but I am still pretty confused on how I can use it effectively. Anyways, here's my try with Pool:

from multiprocessing import Pool
class myClass(object):
    def __init__(self):
        #some stuff
        self.pool = Pool(processes=2)
    def backgroundFunc(self):
        # delete some files on disk
        print('some stuff')
    def mainFunc(self, elem):
        # Do some other things
        self.pool.apply_async(self.backgroundFunc)

However, apply_async seems not to work. None of the print statements that I have in the backgroundFunc print anything on the screen. I added self.pool.close() after apply_async but I get some errors soon after the second processes start. I tried using things like self.pool.apply and some others but it seems that they require a function that takes limited arguments. But my backgroundFunc does not take any arguments. Finally, I do not know how I can do the queuing that I explained earlier using Pool.

Also, I want to have control over how many times and when I want to run backgroundFunc. Also, mainFunc should not wait for all threads to finish running before it exits. If that happens, I won't benefit from multi threading because the background function might take too long to finish. Maybe I should have been more clear in the question; sorry about that.

So I would really appreciate if someone can help me with this. I am pretty confused. Thanks in advance!

Amir
  • 10,600
  • 9
  • 48
  • 75
  • Why do you want to use a limited number of threads? Python threads are not the same as system threads and won’t consume more cpu cores. – Joel Cornett Mar 03 '18 at 17:28
  • @JoelCornett Because I need all other cores to spend time on doing some other processes that I care much more about. – Amir Mar 03 '18 at 17:50
  • To reiterate, python threads are not system threads, they don’t consume multiple cores. What sort of background work are you doing? Is it IO bound or CPU bound? – Joel Cornett Mar 03 '18 at 18:09
  • @JoelCornett Sorry I'm pretty new to multi-threading. The background work is IO bound. So far, the easiest approach for me to use `multiprocessing.Process` but for some reason things become drastically slow very quickly for the other operations in the main function. I don't know any better solution. – Amir Mar 03 '18 at 18:23
  • Please elaborate on ["I want to have control over how many times and when I want to run backgroundFunc"](https://stackoverflow.com/questions/49081260/executing-a-function-in-the-background-while-using-limited-number-of-cores-threa#comment85176732_49082493). – unutbu Mar 03 '18 at 22:10
  • Perhaps what you are looking for is [a Semaphore to control how many workers are active at one time](https://pymotw.com/3/multiprocessing/communication.html#controlling-concurrent-access-to-resources). – unutbu Mar 03 '18 at 22:16
  • By the way, you might be interested in using a tool like [`watchdog`](https://pypi.python.org/pypi/watchdog) to react to filesystem events. Then, instead of calling `backgroundFunc` multiple times, you could have one process reacting to filesystem events continually until your program terminates. The filesystem-monitoring process won't consume much CPU when there is no filesystem activity. – unutbu Mar 03 '18 at 23:03
  • @unutbu By *having control over how many times I want to run `backgorundFunc`* I meant do not want the behavior of `Process` that you do not have any control over the things (e.g. `self` variables) inside `backgroundFunc` when running. Also, you cannot make sure only two background jobs are running.. I'll look into the things you mentioned here. Will get back to you sometime soon – Amir Mar 05 '18 at 15:46
  • @unutbu I still have to figure things out here. Could you please take a look at another question that I need some help for. I would appreciate if you can take a look at it. I want to parallelize two calls to the same function, each with different arguments, in a for loop but I do not know how to do that in practice. Could you please take a look at my question [here](https://stackoverflow.com/questions/49145136/how-to-use-multiprocessing-to-parallelize-two-calls-to-the-same-function-with-d) and see if you can offer a solution? – Amir Mar 07 '18 at 06:08
  • 2
    It appears you have already received a number of answers so I'll just leave 2 general tips here: (1) If you plan on using multiprocessing in Python, reading [Doug Hellman's tutorial on multiprocessing](https://pymotw.com/3/multiprocessing/index.html) is time very well spent. It will show you with short, runnable examples of all the main design patterns available to you. – unutbu Mar 07 '18 at 16:13
  • 1
    (2) If the answers you are receiving are not satisfactory, it may help to clarify your questions using a [runnable MCVE](https://stackoverflow.com/help/mcve). For example, instead of mentioning "Each of these functions will execute some parallel operations" show us in the code. We probably don't need to see the long serial computation, (so substituting `time.sleep(1)` may suffice) but we do need to see the structure of your parallel operations and overall program. A small runnable example may enable us to help you better. – unutbu Mar 07 '18 at 16:22

1 Answers1

1

the program crashes randomly. I guess

It would be easier to concentrate on one problem at a time, without guessing, so, what's the crash?

Here's a test with threading that might inspire you, based on the example for queue.

#!python3
#coding=utf-8
""" https://stackoverflow.com/q/49081260/ """

import sys, time, threading, queue

print(sys.version)

class myClass:
    """ """

    def __init__(self):
        """ """
        self.q = queue.Queue()
        self.threads = []
        self.num_worker_threads = 2

    def backgroundFunc(self):
        """ """
        print("thread started")
        while True:
            item = self.q.get()
            if item is None:
                #self.q.task_done()
                break
            print("working on ", item)
            time.sleep(0.5)
            self.q.task_done()
        print("thread stopping")

    def mainFunc(self):
        """ """

        print("starting thread(s)")
        for i in range(self.num_worker_threads):
            t = threading.Thread(target=self.backgroundFunc)
            t.start()
            self.threads.append(t)

        print("giving thread(s) some work")
        for item in range(5):
            self.q.put(item)

        print("giving thread(s) more work")
        for item in range(5,10):
            self.q.put(item)

        # block until all tasks are done
        print("waiting for thread(s) to finish")
        self.q.join()

        # stop workers
        print("stopping thread(s)")
        for i in range(self.num_worker_threads):
            self.q.put(None)
        for t in self.threads:
            self.q.join()

        print("finished")



if __name__ == "__main__":
    print("instance")
    myClassInstance = myClass()

    print("run")
    myClassInstance.mainFunc()

    print("end.")

It prints

3.6.1 (v3.6.1:69c0db5, Mar 21 2017, 17:54:52) [MSC v.1900 32 bit (Intel)]
instance
run
starting thread(s)
thread started
thread started
giving thread(s) some work
giving thread(s) more work
waiting for thread(s) to finish
working on  0
working on  1
working on  2
working on  3
working on  4
working on  5
working on  6
working on  7
working on  8
working on  9
stopping thread(s)
thread stopping
thread stopping
finished
end.
handle
  • 5,859
  • 3
  • 54
  • 82
  • Move the thread management to `__init__` (also mabye https://docs.python.org/3.6/reference/datamodel.html#object.__del__) if `mainFunc` gets called more than once. For several instances of myClass, see https://stackoverflow.com/questions/68645/static-class-variables-in-python – handle Mar 03 '18 at 09:42
  • I have not run your code but it seems to me that your code assigns `backgroundFunc` to two threads immediately but I do not want that to be the case. I want to have control over how many times and when I want to run `backgroundFunc`. Also, `mainFunc` should not wait for all threads to finish running before it exits. If that happens, I won't benefit from multi threading because the background function might take too long to finish. Maybe I should have been more clear in the question; sorry about that. – Amir Mar 03 '18 at 15:20
  • In addition, from the impression I got from all other posts, your code seems a bit too complicated. Is there a way to simplify it? – Amir Mar 03 '18 at 15:20
  • I've retried the original example code: it does work as-is. I had issues with `q.join()` blocking, and added `task_done()` to solve that - commented out now in the code above, but not tested again. – handle Mar 03 '18 at 17:02
  • I'm gonna try your proposed method sometime soon and will get back to you on it. Thank you! – Amir Mar 05 '18 at 15:40