0

I'm writing a script that processes several different instances of a Class object, which contains a number of attributes and methods. The objects are all placed in a single list (myobjects = [myClass(IDnumber=1), myClass(IDnumber=2), myClass(IDnumber=3)], and then modified by fairly simplistic for loops that call specific functions from the objects, of the form

for x in myobjects: 
    x.myfunction()

This script utilizes logging, to forward all output to a logfile that I can check later. I'm attempting to parallelize this script, because it's fairly straightforward to do so (example below), and need to utilize a queue in order to organize all the logging outputs from each Process. This aspect works flawlessly- I can define a new logfile for each process, and then pass the object-specific logfile back to my main script, which can then organize the main logfile by appending each minor logfile in turn.

from multiprocessing import Process, Queue
q = Queue()
threads = []
mainlog = 'mylogs.log'    #this is set up in my __init__.py but included here as demonstration
for x in myobjects:
    logfile = x.IDnumber+'.log'
    thread = Process(target=x.myfunction(), args=(logfile, queue))
    threads.append(thread)
    thread.start()
for thread in threads:
    if thread.is_alive():
        thread.join()
while not queue.empty():
    minilog = queue.get()
    minilog_open = open(minilog, 'r')
    mainlog_open = open(mainlog, 'a+')
    mainlog_open.write(minilog_open.read())

My problem, now, is that I also need these objects to update a specific attribute, x.success, as True or False. Normally, in serial, x.success is updated at the end of x.myfunction() and is sent down the script where it needs to go, and everything works great. However, in this parallel implementation, x.myfunction populates x.success in the Process, but that information never makes it back to the main script- so if I add print(success) inside myfunction(), I see True or False, but if I add for x in myobjects: print(x.success) after the queue.get() block, I just see None. I realize that I can just use queue.put(success) in myfunction() the same way I use queue.put(logfile), but what happens when two or more processes finish simultaneously? There's no guarantee (that I know of) that my queue will be organized like

  1. logfile (for myobjects[0])
  2. success = True (for myobjects[0])
  3. logfile (for myobjects[1])
  4. success = False (for myobjects[1]) (etc etc)

How can I organize object-specific outputs from a queue, if this queue contains both logfiles and variables? I need to know the content of x.success for each x.myfunction(), so that information has to come back to the main process somehow.

tashton
  • 59
  • 7
  • Just write a wrapper function that takes x as one of its arguments. Within the wrapper you can then call x.myfunction() and update x.success as appropriate. As an aside, you have some problems with "dangling" file handles. Consider using a context manager –  Aug 12 '21 at 17:33
  • I'm not sure what you mean by wrapper, and I'm not too familiar with context managers. Could you give me a very small example? – tashton Aug 12 '21 at 18:58

1 Answers1

0

OP has request an example to demonstrate concepts mentioned in my comment. Explanation follows the code:-

import concurrent.futures


class MyObject:
    def __init__(self):
        self._ID = str(id(self))
        self._status = None

    @property
    def ID(self):
        return self._ID

    @property
    def status(self):
        return self._status

    @status.setter
    def status(self, status):
        self._status = status

    def MyFunction(self):
        # do the real work here
        self.status = True


def MyThreadFunc(args):
    myObject = args[0]
    myObject.MyFunction()
    # note that the wrapper function returns a tuple
    return myObject.status, myObject.ID


if __name__ == '__main__':
    N = 10  # number of instances of MyObject
    myObjects = [MyObject() for _ in range(N)]
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = {executor.submit(MyThreadFunc, [o]): o for o in myObjects}
        for future in concurrent.futures.as_completed(futures):
            _status, _id = future.result()
            print(f'Status is {_status} for ID {_id}')

The class MyObject obviously doesn't do very much. The key features are that it has a string version of its id, a status and a function that does something but implicitly returns None.

We write a wrapper function that takes a reference to an instance of MyObject (first element in the iterable args), executes MyFunction() on that particular class instance then return that class's ID and status as a tuple.

The main loop uses a pattern that I use a lot and I'm sure many others do too. Using a dictionary comprehension, we build the so-called "futures". Remember that the second argument to submit() must be an iterable even though MyThreadFunc only needs one value.

We then wait for the threads to complete and get their return values.

  • This is very interesting. I see how it all fits together, more or less. Could you explain @ property and @ status.setter though? I'm pretty sure those are context managers but I'm not quite clear on what they do – tashton Aug 13 '21 at 14:42
  • They are decorators. See https://stackoverflow.com/questions/17330160/how-does-the-property-decorator-work-in-python –  Aug 14 '21 at 07:29
  • Lots of reading to do, I see. My last question is about the logging: what would be the best way, in this implementation, to handle the logs? Each task in `MyObject` produces output that goes into the logfile, so would the format you provided append to the log simultaneously (and possibly unordered)? It's important that the log is organized, either by task or by `MyObject` instance – tashton Aug 16 '21 at 13:38