1
  1. I have a function var. I want to know the best possible way to run the loop within this function quickly by multiprocessing/parallel processing by utilizing all the processors, cores, and RAM memory the system has.

    import numpy as np
    from pysheds.grid import Grid
    
    xs = 82.1206, 80.8707, 80.8789, 80.8871, 80.88715
    ys = 25.2111, 16.01259, 16.01259, 16.01259, 15.9956
    
    a = r'/home/test/image1.tif'
    b = r'/home/test/image2.tif'
    
    def var(interest):
    
        variable_avg = []
        for (x,y) in zip(xs,ys):
            grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    
            grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label') 
    
            grid.clip_to('catch')
    
            grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    
            variablemask = grid.view('variable', nodata=np.nan)
            variablemask = np.array(variablemask)
            variablemean = np.nanmean(variablemask)
            variable_avg.append(variablemean)
        return(variable_avg)
    
    
  2. It would be great if I can run both function var and loop in it parallelly for the given multiple parameters of the function. ex: Calling var(a)and var(b) at the same time. Since it will consume much less time then just parallelizing the loop for multiple coordinates(xs, ys) alone.

pysheds document can be found here.

The data used data.tif in the code at grid = Grid.from_raster(r'/home/data/data.tif', data_name='map') can be downloaded from here directly. The same data can be copied with different names in the directory and use in the place of a = r'/home/test/image1.tif' andb = r'/home/test/image2.tif' for testing the code.

To speed up the above code I got a suggestion here, and it is as follows:

def process_poi(interest, x, y):
    grid = Grid.from_raster(interest, data_name='map')

    grid.catchment(data='map', x=x, y=y, out_name='catch')

    variable = grid.view('catch', nodata=np.nan)
    variable = np.array(variable)
    return variable.mean()

async def var_loop_async(interest, pool, loop):
    tasks = []
    for (x,y) in zip(xs,ys):
        function_call = functools.partial(process_poi, interest, x, y)
        tasks.append(loop.run_in_executor(pool, function_call))

    return await asyncio.gather(*tasks)

async def main():
    loop = asyncio.get_event_loop()
    pool_start = time.time()
    tasks = []
    with ProcessPoolExecutor() as pool:
        for _ in range(100):
            tasks.append(var_loop_async(a, pool, loop))
        results = await asyncio.gather(*tasks)
        pool_end = time.time()
        print(f'Process pool took {pool_end-pool_start}')

    serial_start = time.time() 

However, I could not understand how to call the function var_loop_async(interest, pool, loop). Indeed I could not get what parameters to be called in place of pool and loop.

I am very new to python programming.

Kindly make the above suggestion a reproducible solution, if possible, so that it can be directly run in python. Or if you have any other better suggestions to speed up the original code, please do let me know.

Gun
  • 556
  • 6
  • 21

1 Answers1

2

First, in your original code, I see:

for (x,y) in zip(xs,ys):
    grid = Grid.from_raster(interest, data_name='map')

I am not familiar with the pysheds module and could not find any documentation on it so I have no idea whether Grid.from_raster is an expensive operation or not. But it seems that this statement is a candidate for moving above the for loop instead of recomputing in the loop. Perhaps this alone would gain significant performance improvements. The link, What all parameters to be called in a async function in python?, you referred to suggests that the overhead of creating a process pool might not compensate enough to be worth the trouble. Also, if Grid.from_raster is expensive and profits by removing it from the loop, then the multiprocessing solution in essence "puts it back in the loop" by causing it to be executed for each x, y pair, thus making a multiprocessing solution less likely to cause a performance improvement.

Anyway, to run your code using the suggested technique, see below. Unfortunately, you cannot run both process_poi and var_loop_async in the processor pool. But look further below for a different solution.

import numpy
from pysheds.grid import Grid
from concurrent.futures.process import ProcessPoolExecutor
import asyncio


xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = (
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
)


def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


async def var_loop_async(interest, pool, loop):
    tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
    return await asyncio.gather(*tasks)


async def main():
    loop = asyncio.get_event_loop()
    with ProcessPoolExecutor() as pool:
        tasks = [var_loop_async(file, pool, loop) for file in file_list]
        results = await asyncio.gather(*tasks)
        print(results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

A Different Solution

You would like to be able to run var in a process pool for each file to be processed and then process each x, y pair in a sub-process. This means you need each sub-process that is processing a file to have its own process pool to process the x, y pairs. This is normally impossible because processes created for process pools are daemon processes (they automatically terminate when the main process terminates) and hey are not allowed to create their own sub-processes. To overcome this, we must create our own specialization of mutliprocessor.Pool and initialize each sub-process with its own pool.

But will this be a performance improvement? The var sub-processes are essentially doing nothing except waiting for the process_poi sub-processes to complete their work. So I would not expect this to be much of an improvement over the previous code. And, as I mentioned, it's not clear whether either multiprocessing solution would be an improvement over the original code, especially one modified to relocate the Grid.from_raster call.

import numpy
from pysheds.grid import Grid
import functools
from multiprocessing.pool import Pool
import multiprocessing
import os

# This allows subprocesses to have their own pools:

class NoDaemonProcess(multiprocessing.Process):
    # make 'daemon' attribute always return False
    def _get_daemon(self):
        return False
    def _set_daemon(self, value):
        pass
    daemon = property(_get_daemon, _set_daemon)

class NoDaemonContext(type(multiprocessing.get_context())):
    Process = NoDaemonProcess

class MyPool(multiprocessing.pool.Pool):
    def __init__(self, *args, **kwargs):
        kwargs['context'] = NoDaemonContext()
        super(MyPool, self).__init__(*args, **kwargs)


xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'


pool2 = None

def init_pool():
    global pool2
    #pool2 = Pool(5)
    pool2 = Pool(os.cpu_count // 2) # half the available number of processors


def process_poi(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


def var(interest):
    task = functools.partial(process_poi, interest)
    return pool2.starmap(task, zip(xs, ys))


def main():
    # This will create non-daemon processes so that these processes can create their own pools:
    with MyPool(2, init_pool) as pool:
        results = pool.map(var, [a, b])
        print(results)


if __name__ == "__main__":
    main()

A Third Solution Using Threads

Using asyncio:

import numpy
from pysheds.grid import Grid
from concurrent.futures import ThreadPoolExecutor
import asyncio

xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = [
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
]

def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


async def var_loop_async(interest, pool, loop):
    tasks = [loop.run_in_executor(pool, process_point, interest, x, y) for (x, y) in zip(xs, ys)]
    return await asyncio.gather(*tasks)


async def main():
    loop = asyncio.get_event_loop()
    with ThreadPoolExecutor(max_workers=100) as pool:
        tasks = [var_loop_async(file, pool, loop) for file in file_list]
        results = await asyncio.gather(*tasks)
        print(results)


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

Alternative:

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor


xs = 82.1206, 72.4542, 65.0431, 83.8056, 35.6744
ys = 25.2111, 17.9458, 13.8844, 10.0833, 24.8306

a = r'/home/test/image1.tif'
b = r'/home/test/image2.tif'



def process_poi(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean


def var(executor, interest):
    return list(executor.map(functools.partial(process_poi, interest), xs, ys))


def main():
    with ThreadPoolExecutor(max_workers=100) as executor:
        results = list(executor.map(functools.partial(var, executor), [a, b]))
        print(results)


if __name__ == "__main__":
    main()

Updated Solution Using Threads Based On OP's Updated Code

import numpy
from pysheds.grid import Grid
import functools
from concurrent.futures import ThreadPoolExecutor


xs = (82.1206, 72.4542, 65.0431, 83.8056, 35.6744)
ys = (25.2111, 17.9458, 13.8844, 10.0833, 24.8306)

file_list = (
    r'/home/test/image1.tif',
    r'/home/test/image2.tif'
)


def process_point(interest, x, y):
    grid = Grid.from_raster(r'/home/data/data.tif', data_name='map')
    grid.catchment(data='map', x=x, y=y, out_name='catch', recursionlimit=15000000, xytype='label')
    grid.clip_to('catch')
    grid.read_raster(interest, data_name='variable', window=grid.bbox, window_crs=grid.crs)
    variablemask = grid.view('variable', nodata=np.nan)
    variablemask = numpy.array(variablemask)
    variablemean = np.nanmean(variablemask)
    return variablemean

def var(executor, interest):
    return list(executor.map(functools.partial(process_point, interest), xs, ys))


def main():
    with ThreadPoolExecutor(max_workers=100) as executor:
        results = list(executor.map(functools.partial(var, executor), file_list))
        print(results)


if __name__ == "__main__":
    main()
Booboo
  • 38,656
  • 3
  • 37
  • 60
  • Comments are not for extended discussion; this conversation has been [moved to chat](https://chat.stackoverflow.com/rooms/221395/discussion-on-answer-by-booboo-what-all-parameters-to-be-called-in-a-async-funct). – Machavity Sep 12 '20 at 18:58