First, in your original code, I see:
for (x,y) in zip(xs,ys):
grid = Grid.from_raster(interest, data_name='map')
I am not familiar with the pysheds
module and could not find any documentation on it so I have no idea whether Grid.from_raster
is an expensive operation or not. But it seems that this statement is a candidate for moving above the for
loop instead of recomputing in the loop. Perhaps this alone would gain significant performance improvements. The link, What all parameters to be called in a async function in python?, you referred to suggests that the overhead of creating a process pool might not compensate enough to be worth the trouble. Also, if Grid.from_raster
is expensive and profits by removing it from the loop, then the multiprocessing solution in essence "puts it back in the loop" by causing it to be executed for each x, y pair, thus making a multiprocessing solution less likely to cause a performance improvement.
Anyway, to run your code using the suggested technique, see below. Unfortunately, you cannot run both process_poi
and var_loop_async
in the processor pool. But look further below for a different solution.
import numpy
from pysheds.grid import Grid
from concurrent.futures.process import ProcessPoolExecutor
import asyncio
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
A Different Solution
You would like to be able to run var
in a process pool for each file to be processed and then process each x, y pair in a sub-process. This means you need each sub-process that is processing a file to have its own process pool to process the x, y pairs. This is normally impossible because processes created for process pools are daemon processes (they automatically terminate when the main process terminates) and hey are not allowed to create their own sub-processes. To overcome this, we must create our own specialization of mutliprocessor.Pool
and initialize each sub-process with its own pool.
But will this be a performance improvement? The var
sub-processes are essentially doing nothing except waiting for the process_poi
sub-processes to complete their work. So I would not expect this to be much of an improvement over the previous code. And, as I mentioned, it's not clear whether either multiprocessing solution would be an improvement over the original code, especially one modified to relocate the Grid.from_raster
call.
import numpy
from pysheds.grid import Grid
import functools
from multiprocessing.pool import Pool
import multiprocessing
import os
# This allows subprocesses to have their own pools:
class NoDaemonProcess(multiprocessing.Process):
# make 'daemon' attribute always return False
def _get_daemon(self):
return False
def _set_daemon(self, value):
pass
daemon = property(_get_daemon, _set_daemon)
class NoDaemonContext(type(multiprocessing.get_context())):
Process = NoDaemonProcess
class MyPool(multiprocessing.pool.Pool):
def __init__(self, *args, **kwargs):
kwargs['context'] = NoDaemonContext()
super(MyPool, self).__init__(*args, **kwargs)
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
pool2 = None
def init_pool():
global pool2
#pool2 = Pool(5)
pool2 = Pool(os.cpu_count // 2) # half the available number of processors
def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(interest):
task = functools.partial(process_poi, interest)
return pool2.starmap(task, zip(xs, ys))
def main():
# This will create non-daemon processes so that these processes can create their own pools:
with MyPool(2, init_pool) as pool:
results = pool.map(var, [a, b])
print(results)
if __name__ == "__main__":
main()
A Third Solution Using Threads
Using asyncio
:
import numpy
from pysheds.grid import Grid
from concurrent.futures import ThreadPoolExecutor
import asyncio
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = [
r'/home/test/image1.tif',
r'/home/test/image2.tif'
]
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
async def var_loop_async(interest, pool, loop):
tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
return await asyncio.gather(*tasks)
async def main():
loop = asyncio.get_event_loop()
with ThreadPoolExecutor(max_workers=100) as pool:
tasks = [var_loop_async(file, pool, loop) for file in file_list]
results = await asyncio.gather(*tasks)
print(results)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(main())
finally:
loop.close()
Alternative:
import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor
xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306
a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'
def process_poi(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(executor, interest):
return list(executor.map(functools.partial(process_poi, interest), xs, ys))
def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), [a, b]))
print(results)
if __name__ == "__main__":
main()
Updated Solution Using Threads Based On OP's Updated Code
import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor
xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)
file_list = (
r'/home/test/image1.tif',
r'/home/test/image2.tif'
)
def process_point(interest, x, y):
grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
grid.clip_to('catch')
grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
variablemask = grid.view('variable', nodata=np.nan)
variablemask = numpy.array(variablemask)
variablemean = np.nanmean(variablemask)
return variablemean
def var(executor, interest):
return list(executor.map(functools.partial(process_point, interest), xs, ys))
def main():
with ThreadPoolExecutor(max_workers=100) as executor:
results = list(executor.map(functools.partial(var, executor), file_list))
print(results)
if __name__ == "__main__":
main()