3

I am running pool.map on big data array and i want to print report in console every minute. Is it possible? As i understand, python is synchronous language, it can't do this like nodejs.

Perhaps it can be done by threading.. or how?

finished = 0

def make_job():
   sleep(1)
   global finished
   finished += 1

# I want to call this function every minute
def display_status():
   print 'finished: ' + finished

def main():
    data = [...]
    pool = ThreadPool(45)
    results = pool.map(make_job, data)
    pool.close()
    pool.join()
avasin
  • 9,186
  • 18
  • 80
  • 127

3 Answers3

4

You can use a permanent threaded timer, like those from this question: Python threading.timer - repeat function every 'n' seconds

from threading import Timer,Event 

class perpetualTimer(object):

   # give it a cycle time (t) and a callback (hFunction) 
   def __init__(self,t,hFunction):
      self.t=t
      self.stop = Event()
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      if not self.stop.is_set():
          self.thread.start()

   def start(self):
      self.stop.clear()
      self.thread.start()

   def cancel(self):
      self.stop.set()
      self.thread.cancel()

Basically this is just a wrapper for a Timer object that creates a new Timer object every time your desired function is called. Don't expect millisecond accuracy (or even close) from this, but for your purposes it should be ideal.

Using this your example would become:

finished = 0

def make_job():
   sleep(1)
   global finished
   finished += 1

def display_status():
   print 'finished: ' + finished

def main():
    data = [...]
    pool = ThreadPool(45)

    # set up the monitor to make run the function every minute
    monitor = PerpetualTimer(60,display_status)
    monitor.start()
    results = pool.map(make_job, data)
    pool.close()
    pool.join()
    monitor.cancel()

EDIT:

A cleaner solution may be (thanks to comments below):

from threading import Event,Thread 

class RepeatTimer(Thread):
    def __init__(self, t, callback, event):
        Thread.__init__(self)
        self.stop = event
        self.wait_time = t
        self.callback = callback
        self.daemon = True

    def run(self):
        while not self.stop.wait(self.wait_time):
            self.callback()

Then in your code:

def main():
    data = [...]
    pool = ThreadPool(45)
    stop_flag = Event()
    RepeatTimer(60,display_status,stop_flag).start()
    results = pool.map(make_job, data)
    pool.close()
    pool.join()
    stop_flag.set()
Community
  • 1
  • 1
ebarr
  • 7,704
  • 1
  • 29
  • 40
  • Nice idea, though you have race condition here. If `monitor.cancel()` is called when `.handle_function()` executes `self.hFunction()`, `.handleFunction()` will be fully executed anyway, so it will create next Timer thread and effectively PerpetualTimer will continue to work indefinitely, preventing the program to finish. – RobertT Sep 11 '14 at 06:55
  • @RoberT Yup, the PerpetualTimer could be redone in a number of thread safe ways. I have added a fix that should hopefully get the specific race condition you mentioned. – ebarr Sep 11 '14 at 07:33
  • looks like you've closed the door but opened a window (albeit very small one). This time on race is in `.start()` and needs quite heavily overloaded computer. If PerpetualTimer instance was used at least once. self.stop is set, so if after `self.thread.start()` some heavy load causes that Timer() somehow manages to start, wait given time and execute up to 'if not self.stop.is_set:` before main thread catches up to execute `self.stop.clear()`. Monitoring will die on first iteration. – RobertT Sep 11 '14 at 14:38
  • I don't even want to think (not mentionint to calc) how unprobable it is, but given the fact, that 45 other potentially resource heavy threads work in parallel may give it slight chance to happen. Of course simply swapping those two calls in `.start()` solves the issue. Other thing is that if you've decided to Use Events(), there are much cleaner and nicer solutions on the page you've linked. Also, in this case `Event` is not really needed, as code is not waiting for it anywhere, so simple boolean flag would do the job – RobertT Sep 11 '14 at 14:45
  • @RoberT I think I must be going blind! I completely missed the accepted answer on that question I linked to. You are correct, it is considerably better. – ebarr Sep 11 '14 at 23:32
2

One way to do this, is to use main thread as the monitoring one. Something like below should work:

def main():
   data = [...]
   results = []
   step = 0
   pool = ThreadPool(16)
   pool.map_async(make_job, data, callback=results.extend)
   pool.close()
   while True:
      if results:
          break
      step += 1
      sleep(1)
      if step % 60 == 0:
          print "status update" + ...

I've used .map() instead of .map_async() as the former is synchronous one. Also you probably will need to replace results.extend with something more efficient. And finally, due to GIL, speed improvement may be much smaller than expected.

BTW, it is little bit funny that you wrote that Python is synchronous in a question that asks about ThreadPool ;).

RobertT
  • 4,300
  • 3
  • 29
  • 36
1

Consider using the time module. The time.time() function returns the current UNIX time.

For example, calling time.time() right now returns 1410384038.967499. One second later, it will return 1410384039.967499.

The way I would do this would be to use a while loop in the place of results = pool(...), and on every iteration to run a check like this:

last_time = time.time()
while (...):
    new_time = time.time()
    if new_time > last_time+60:
        print "status update" + ...
        last_time = new_time
    (your computation here)

So that will check if (at least) a minute has elapsed since your last status update. It should print a status update approximately every sixty seconds.

Sorry that this is an incomplete answer, but I hope this helps or gives you some useful ideas.

Newb
  • 2,810
  • 3
  • 21
  • 35