4

EDIT: Updated with environment information (see first section)

Environment

I'm using Python 2.7

Ubuntu 16.04

Issue

I have an application which I've simplified into a three-stage process:

  1. Gather data from multiple data sources (HTTP requests, system info, etc)
  2. Compute metrics based on this data
  3. Output these metrics in various formats

Each of these stages must complete before moving on to the next stage, however each stage consists of multiple sub-tasks that can be run in parallel (I can send off 3 HTTP requests and read system logs while waiting for them to return)

I've divided up the stages into modules and the sub-tasks into submodules, so my project hierarchy looks like so:

+ datasources
|-- __init__.py
|-- data_one.py
|-- data_two.py
|-- data_three.py
+ metrics
|-- __init__.py
|-- metric_one.py
|-- metric_two.py
+ outputs
|-- output_one.py
|-- output_two.py
- app.py

app.py looks roughly like so (pseudo-code for brevity):

import datasources
import metrics
import outputs

for datasource in dir(datasources):
    datasource.refresh()
for metric in dir(metrics):
    metric.calculate()
for output in dir(outputs):
    output.dump()

(There's additional code wrapping the dir call to ignore system modules, there's exception handling, etc -- but this is the gist of it)

Each datasource sub-module looks roughly like so:

data = []

def refresh():
    # Populate the "data" member somehow
    data = [1, 2, 3]
    return

Each metric sub-module looks roughly like so:

import datasources.data_one as data_one
import datasources.data_two as data_two

data = []

def calculate():
    # Use the datasources to compute the metric
    data = [sum(x) for x in zip(data_one, data_two)]
    return

In order to parallelize the first stage (datasources) I wrote something simple like the following:

def run_thread(datasource):
    datasource.refresh()

threads = []
for datasource in dir(datasources):
    thread = threading.Thread(target=run_thread, args=(datasource))
    threads.append(thread)
    thread.start()
for thread in threads:
    thread.join()

This works, and afterwards I can compute any metric and the datasources.x.data attribute is populated

In order to parallelize the second stage (metrics) because it depends less on I/O and more on CPU, I felt like simple threading wouldn't actually speed things up and I would need the multiprocessing module in order to take advantage of multiple cores. I wrote the following:

def run_pool(calculate):
    calculate()

pool = multiprocessing.Pool()
pool.map(run_pool, [m.calculate for m in dir(metrics)]
pool.close()
pool.join()

This code runs for a few seconds (so I think it's working?) but then when I try:

metrics.metric_one.data

it returns [], like the module was never run

Somehow by using the multiprocessing module it seems to be scoping the threads so that they no longer share the data attribute. How should I go about rewriting this so that I can compute each metric in parallel, taking advantage of multiple cores, but still have access to the data when it's done?

stevendesu
  • 15,753
  • 22
  • 105
  • 182
  • See [here](https://stackoverflow.com/questions/3044580/multiprocessing-vs-threading-python?rq=1) for more information on the differences between multiprocessing and threading – bendl Jul 07 '17 at 15:27

2 Answers2

0

Process and Thread behave quite differently in python. If you want to use multiprocessing you will need to use a synchronized data type to pass information around.

For example you could use multiprocessing.Array, which can be shared between your processes.

For detail see the docs: https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes

ebarr
  • 7,704
  • 1
  • 29
  • 40
  • This gives me minor direction for where to start with my Googling but doesn't really answer the question at hand like Scott Mermelstein's answer – stevendesu Jul 07 '17 at 18:10
0

Updated again, per comments: Since you're in 2.7, and you're dealing with modules instead of objects, you're having problems pickling what you need. The workaround is not pretty. It involves passing the name of each module to your operating function. I updated the partial section, and also updated to remove the with syntax.

A few things:

First, in general, it's better to multicore than thread. With threading, you always run a risk of dealing with the Global Interpreter Lock, which can be extremely inefficient. This becomes a non-issue if you use multicore.

Second, you've got the right concept, but you make it strange by having a global-to-the-module data member. Make your sources return the data you're interested in, and make your metrics (and outputs) take a list of data as input and output the resultant list.

This would turn your pseudocode into something like this:

app.py:

import datasources
import metrics
import outputs

pool = multiprocessing.Pool()
data_list = pool.map(lambda o: o.refresh, list(dir(datasources)))
pool.close()
pool.join()

pool = multiprocessing.Pool()
metrics_funcs = [(m, data_list) for m in dir(metrics)]
metrics_list = pool.map(lambda m: m[0].calculate(m[1]), metrics_funcs)
pool.close()
pool.join()

pool = multiprocessing.Pool()
output_funcs = [(o, data_list, metrics_list) for o in dir(outputs)]
output_list = pool.map(lambda o: o[0].dump(o[1], o[2]), output_funcs)
pool.close()
pool.join()

Once you do this, your data source would look like this:

def refresh():
    # Populate the "data" member somehow
    return [1, 2, 3]

And your metrics would look like this:

def calculate(data_list):
    # Use the datasources to compute the metric
    return [sum(x) for x in zip(data_list)]

And finally, your output could look like this:

def dump(data_list, metrics_list):
    # do whatever; you now have all the information

Removing the data "global" and passing it makes each piece a lot cleaner (and a lot easier to test). This highlights making each piece completely independent. As you can see, all I'm doing is changing what's in the list that gets passed to map, and in this case, I'm injecting all the previous calculations by passing them as a tuple and unpacking them in the function. You don't have to use lambdas, of course. You can define each function separately, but there's really not much to define. However, if you do define each function, you could use partial functions to reduce the number of arguments you pass. I use that pattern a lot, and in your more complicated code, you may need to. Here's one example:

from functools import partial

do_dump(module_name, data_list, metrics_list):
    globals()[module_name].dump(data_list, metrics_list)

invoke = partial(do_dump, data_list=data_list, metrics_list=metrics_list)
with multiprocessing.Pool() as pool:
    output_list = pool.map(invoke, [o.__name__ for o in dir(outputs)])
    pool.close()
    pool.join()

Update, per comments:

When you use map, you're guaranteed that the order of your inputs matches the order of your outputs, i.e. data_list[i] is the output for running dir(datasources)[i].refresh(). Rather than importing the datasources modules into metrics, I would make this change to app.py:

data_list = ...
pool.close()
pool.join()
data_map = {name: data_list[i] for i, name in enumerate(dir(datasources))}

And then pass data_map into each metric. Then the metric gets the data that it wants by name, e.g.

d1 = data_map['data_one']
d2 = data_map['data_two']
return [sum(x) for x in zip([d1, d2])]
Scott Mermelstein
  • 15,174
  • 4
  • 48
  • 76
  • Quick question with your examples. How do you keep organized which data source an element in the `data_list` came from? Suppose I have 5 data sources that I'm pulling in, but a metric only depends on two of them. How do I make sure I read the appropriate data sources? I feel like this will return a list of data elements but no way of knowing which data source maps to which item in the list. – stevendesu Jul 07 '17 at 15:50
  • 1
    Going to test this out now, but it looks like this will solve everything for me :) May take me a bit to get it working with the exception handling and other random checks. Plus the fact that I'm a Python newb doesn't help :D – stevendesu Jul 07 '17 at 17:25
  • I'm having some issues :( At first it complained about multiprocessing.Pool() not having an `__exit__` attribute (I'm using Python 2.7). I removed the "with" clause and fixed that, but now it's complaining that it can't pickle a `` – stevendesu Jul 07 '17 at 18:05
  • Either in the tags, or in your question, you should make it clear that you're using 2.7 (not just in this comment, so other readers and answerers see it in the question). I was giving code for python 3.5+, and you're right, getting rid of the "with" block is the fix for that part. It also helps to tell what operating system you're on, because multiprocessing acts differently on Unix vs. Windows (and I'm guessing from your error that you're on Windows). For this problem, avoid iterating on the function; instead iterate on the object and reference the object's function within your code. – Scott Mermelstein Jul 07 '17 at 18:09
  • 1
    I updated the original post with the Python version and OS. I'm running Ubuntu 16.04, not Windows. Also updating it to pass the object and not the function failed with a similar error: can't pickle a `` – stevendesu Jul 07 '17 at 18:16
  • Ok. I'm surprised that 2.7 multiprocessing is so different. In Python 3, multiprocessing can start by either `fork` or `spawn`, and when it `fork`s, you don't have to worry about pickling - it just uses unix fork. Evidently, in Python 2, you're still using the `spawn` method, which pickles your state and passes it on... – Scott Mermelstein Jul 07 '17 at 18:26
  • ... There are workarounds, none of which are pretty. Ideally, you should be using objects in some way, since your usage is a clear case of polymorphism. The object would then be pickleable, and not be a problem. What's probably easiest is to pass each module's `__name__` instead of the module or function. Then access the module (and its function) from within your code. – Scott Mermelstein Jul 07 '17 at 18:27
  • After some experimentation I got something that works! Instead of passing `dir(module)` as the second argument to `pool.map`, I passed `[m.refresh for m in dir(module)]` (or, alternatively, a tuple for m in dir(module) when I needed to also access `m.__name__` and other values) and it worked! I'm now up and running with multicore processing, my only remaining issue is when an exception is thrown in the subprocess I can't pass the traceback back because traceback objects can't be pickled. – stevendesu Jul 07 '17 at 18:52
  • Good, I'm glad you're getting there! Evidently, [map_async](https://docs.python.org/2.7/library/multiprocessing.html#multiprocessing.pool.AsyncResult.get) will forward exceptions, even in python 2.7. I've never used it, but it shouldn't be hard to go from `map` to `map_async`. – Scott Mermelstein Jul 07 '17 at 18:57