EDIT: Updated with environment information (see first section)
Environment
I'm using Python 2.7
Ubuntu 16.04
Issue
I have an application which I've simplified into a three-stage process:
- Gather data from multiple data sources (HTTP requests, system info, etc)
- Compute metrics based on this data
- Output these metrics in various formats
Each of these stages must complete before moving on to the next stage, however each stage consists of multiple sub-tasks that can be run in parallel (I can send off 3 HTTP requests and read system logs while waiting for them to return)
I've divided up the stages into modules and the sub-tasks into submodules, so my project hierarchy looks like so:
+ datasources
|-- __init__.py
|-- data_one.py
|-- data_two.py
|-- data_three.py
+ metrics
|-- __init__.py
|-- metric_one.py
|-- metric_two.py
+ outputs
|-- output_one.py
|-- output_two.py
- app.py
app.py
looks roughly like so (pseudo-code for brevity):
import datasources
import metrics
import outputs
for datasource in dir(datasources):
datasource.refresh()
for metric in dir(metrics):
metric.calculate()
for output in dir(outputs):
output.dump()
(There's additional code wrapping the dir
call to ignore system modules, there's exception handling, etc -- but this is the gist of it)
Each datasource sub-module looks roughly like so:
data = []
def refresh():
# Populate the "data" member somehow
data = [1, 2, 3]
return
Each metric sub-module looks roughly like so:
import datasources.data_one as data_one
import datasources.data_two as data_two
data = []
def calculate():
# Use the datasources to compute the metric
data = [sum(x) for x in zip(data_one, data_two)]
return
In order to parallelize the first stage (datasources) I wrote something simple like the following:
def run_thread(datasource):
datasource.refresh()
threads = []
for datasource in dir(datasources):
thread = threading.Thread(target=run_thread, args=(datasource))
threads.append(thread)
thread.start()
for thread in threads:
thread.join()
This works, and afterwards I can compute any metric and the datasources.x.data
attribute is populated
In order to parallelize the second stage (metrics) because it depends less on I/O and more on CPU, I felt like simple threading wouldn't actually speed things up and I would need the multiprocessing module in order to take advantage of multiple cores. I wrote the following:
def run_pool(calculate):
calculate()
pool = multiprocessing.Pool()
pool.map(run_pool, [m.calculate for m in dir(metrics)]
pool.close()
pool.join()
This code runs for a few seconds (so I think it's working?) but then when I try:
metrics.metric_one.data
it returns []
, like the module was never run
Somehow by using the multiprocessing module it seems to be scoping the threads so that they no longer share the data attribute. How should I go about rewriting this so that I can compute each metric in parallel, taking advantage of multiple cores, but still have access to the data when it's done?