2

Suppose I have the following object with multiple expensive properties, as so:

class Object:

  def __init__(self, num):
    self.num = num

  @property 
  def expensive_property(self):
    return expensive_calculation

  @property 
  def expensive_property1(self):
    return expensive_calculation

  @property 
  def expensive_property2(self):
    return expensive_calculation

Note: The number of expensive properties may increase over time. Given a list of Objects how could I compute each expensive property per thread, for all objects in the list. I am having a hard time figuring out how I should arrange my pool.

This is kinda what I am trying to achieve:

from multithreading.dummy import Pool
from multithreading.dummy import Queue

object_list = [Object(i) for i in range(20)]
properties = [expensive_property2, expensive_propert5, expensive_property9, expensive_property3]


def get(obj, expensive_property):
  return [getattr(expensive_property, o) for o in obj]

tasks = Queue()
for p in properties :
  tasks.put((get, o, p))

results = []

with Pool(len(properties )) as pool:
   while True:
      task = tasks.get()
      if task is None:
        break
      func, *args = task
      result = pool.apply_async(func, args)
      results.append(result)
callmeGuy
  • 944
  • 2
  • 11
  • 28

1 Answers1

1

This is a little crazy because apply_async has an internal queue to distribute tasks over the pool. I can imagine reasons to have another queue around observability or backpressure. Is your example your full program? or are you enqueuing work from a different process/thread?

If your computation is CPU bound one option could be to remove the queue to make things a little simpler:

def wait_all(async_results, timeout_seconds_per_task=1):
   for r in async_results:
      r.get(timeout_seconds)

wait_all(
  [pool.apply_async(get, (o, p)) for p in properties],
  timeout_seconds_per_task=1,
)

Like your example above this allows you to distribute computation across your available cpus (pool even defaults to the number of cpus on your machine). If your work is IO bound (suggested by your sleep) processes may have diminishing returns.

You'd have to benchmark but for IO bound you could create a thread pool using the same pattern https://stackoverflow.com/a/3034000/594589

Other options could be to use nonblocking IO with event loop such as gevent, or asyncio. Both would allow you to model the same pool based pattern!

dm03514
  • 54,664
  • 18
  • 108
  • 145
  • this is just part of my program. To be honest, I know my example is a bit dumb, especially with the get() method. I actually can multithread it any way I need to, it just needs to get each property on its own thread – callmeGuy Dec 11 '18 at 00:12
  • the answer does not need to follow my example, I just wrote that to kinda suggest what i have to do. The problem is much simpler, get each property on its own thread (not process) for all objects in the list – callmeGuy Dec 11 '18 at 00:15