0

I am trying to calculate stats on features that fall within a cell, because the dataframes are quite large I am using multiprocessing to speed up the process. When I run the code on a sample dataset of only 1000 grid cells I get expected results; when I try to run the stats for full dataset, (4.3 millon cells+-), I get AssertionError. Looking into this it seems that it is due to trying to pass more than 2Gb of data from the worker to the main process which makes sense, one of my dataframes is 5gb on disk. Apparently this is a bug that has been fixed so I moved to python 3.9 in OSGeo4W but got the same error. I then tried to break it down into smaller datasets with array_split and a loop but still no luck. How can I do this rather large operation? I tried using gp.overlay but got the same assertion errors. Sjoin returns a count of each intersecting feature and I was able to filter and sjoin but some tracks cross the cell multiple times and I need a count of each crossing, hence I use geopandas overlay and clip. I also tried pandarallel but it kept returning the error clip not defined.

from pandas import concat
from pandarallel import pandarallel
from geopandas import read_file, clip
from time import time as ttime, strftime, gmtime
from datetime import timedelta
from calendar import month_name
from functools import partial
from os import chdir, path, remove
from multiprocessing import Pool
from numpy import array_split

def cell_calc(x, y):
    df = clip(y.to_crs(x.crs), x, keep_geom_type=True).explode()
    cell = x
    TList = ['CARGO', 'FISHING', 'OTHER', 'PASSENGER', 'TANKER']
    MList = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
    cell['ALL'] = len(df)
    for type in TList:
        cell[type] = len(df[df['MYTYPE']==type])
    for month in MList:
        mdf = df[df['MONTH']==month]
        cell[month_name[month]+'_ALL'] = len(mdf)
        for type in TList:
            cell[month_name[month]+'_'+type] = len(mdf[mdf['MYTYPE'] == type])
    return cell

if __name__ == '__main__':
    GRIDDB = r'...\GRID.gdb'
    GRID = 'GRID102001'
    TRACKSDB = r'...\ShipTracks.gdb'
    TRACKS = 'ShipTracks_clip'
    OGDB = r'...\OutDir'
    ODB = '2020.gpkg'

    stime = ttime()
    print('Loading datasets at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    GRIDGDF = read_file(GRID)
    GRIDGDF = GRIDGDF[['grid_id', 'geometry']]
    TRACKSGDF = read_file(TRACKSDB, layer=TRACKS, Driver='FileGDB', rows=1000)
    TRACKSGDF = TRACKSGDF[['MYTYPE', 'MONTH', 'geometry']]
    stime = ttime()
    print('Computing cell statistics ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    func = partial(cell_calc, y=TRACKSGDF)
    p = Pool(processes=16)
    split_dfs = array_split(GRIDGDF, 16)
    pool_results = p.map(func, split_dfs)
    p.close()
    p.join()

    grid = concat(pool_results)

    etime = ttime()
    print('Cell statistics completed at ' + str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))

    chdir(OGDB)
    grid.to_file(ODB, layer='AIS_GRID', driver='GPKG')

Here is the error:

AssertionError
Process SpawnPoolWorker-31:
Traceback (most recent call last):
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\pool.py", line 114, in worker
task = get()
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\queues.py", line 366, in get
res = self._reader.recv_bytes()
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 323, in _recv_bytes
return self._get_more_data(ov, maxsize)
  File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 342, in _get_more_data
assert left > 0

I added this loop to try and break the data up even more...

    for grid in array_split(GRIDGDF, 10):
        count = count+1
        tracks = clip(TRACKSGDF, grid)

        print('Computing cell statistics for '+str(count)+' of 10 at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
        func = partial(cell_calc, y=tracks)
        p = Pool(processes=16)
        split_dfs = array_split(grid, 16)
        pool_results = p.map(func, split_dfs)
        p.close()
        p.join()

        grid = concat(pool_results, axis=0)

        etime = ttime()
        print('Cell statistics completed for '+str(count)+' of 10 at '+ str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))
        results.append(grid)
MrKingsley
  • 171
  • 1
  • 10
  • 1
    (Tangentially, code you put in `if __name__ == '__main__':` should be trivial; the purpose of this boilerplate is to allow you to `import` the code, which you will not want to do anyway if the logic you need is not available via `import`. See also https://stackoverflow.com/a/69778466/874188) – tripleee Mar 20 '23 at 16:19
  • Pycharm starts all new scripts with the if __name__ == '__main__': and in order to call pool using with mp.Pool() as pool:... I think you need the if __name__ == '__main__':, which is how I was originally trying to pass the multiprocessor. If it is as you say "trivial" why mention it? Would removing it solve my issue? – MrKingsley Mar 20 '23 at 16:38
  • 1
    What I'm trying to say is that the code in `if __name__ == '__main__':` should not be this complex. The linked answer explains this in more detail. – tripleee Mar 20 '23 at 16:42
  • I'm sorry I don't understand... – MrKingsley Mar 20 '23 at 16:59

1 Answers1

0

After some testing I learned that speeding up an overlay using multiprocessor module works better when you split up the target (TRACKS) as oppose to the mask (GRID). So I changed the code to loop over larger sections of grid and in the grid_calc() function I added the the overlay with a multiprocessor applied to an array_split of the target data. I also switched to pivot_table which is much faster than the for loops to count the records of each month and type.

def grid_calc(grid, tracks):
    stime = ttime()
    print('Starting Overlay at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    func = partial(overlay, df2=grid)
    trackList = []
    with Pool(16) as pool:
        for fresults in pool.map(func, array_split(tracks, 16)):
            trackList.append(fresults)
    tracks = concat(trackList)
    # tracks = overlay(tracks, grid, how='intersection').explode()
    etime = ttime()
    print('Overlay completed at ' + str(strftime('%H:%M:%S', gmtime(etime - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))
    stime = ttime()
    print('Group and Pivot started at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    j1 = tracks.groupby(['grid_id']).size().reset_index(name='ALL')
    j1 = pivot_table(j1, values='ALL', index='grid_id', aggfunc=np_sum, fill_value=0)

    j2 = tracks.groupby('grid_id').size().reset_index(name='COUNT')
    j2 = pivot_table(j2, values='COUNT', index='grid_id', columns='MYTYPE', aggfunc=np_sum, fill_value=0)

    j3 = tracks.groupby(['grid_id', 'MONTH']).size().reset_index(name='COUNT')
    j3['cols'] = j3.apply(lambda x: str(month_name[x['MONTH']]).upper() + '_'+'ALL', axis=1)
    j3 = pivot_table(j3, values='COUNT', index='grid_id', columns='cols', aggfunc=np_sum, fill_value=0)

    j4 = tracks.groupby(['grid_id', 'MONTH', 'MYTYPE']).size().reset_index(name='COUNT')
    j4['cols'] = j4.apply(lambda x: str(month_name[x['MONTH']]).upper() + '_' + x['MYTYPE'], axis=1)
    j4 = pivot_table(j4, values='COUNT', index='grid_id', columns='cols', aggfunc=np_sum, fill_value=0)
    jtables = [j1, j2, j3, j4]
    fgrid = grid
    for jtable in jtables:
        fgrid = merge(fgrid, jtable, how='left', on='grid_id').fillna(0)fill_value=0)

        
    etime = ttime()
    print('Group and Pivot complete at ' + str(strftime('%H:%M:%S', gmtime(etime - 14400))) + ' in ' + str(
        (timedelta(seconds=etime - stime))))
    return fgrid


if __name__ == '__main__':
    print('Importing modules and components...')
    from pandas import concat, pivot_table, merge
    from pandarallel import pandarallel
    from geopandas import read_file, overlay, GeoDataFrame
    from time import time as ttime, strftime, gmtime
    from datetime import timedelta
    from calendar import month_name
    from functools import partial
    from os import chdir
    from multiprocessing import Pool
    from numpy import array_split, sum as np_sum
    print('Imports complete')

    GRIDDB = r'...\GRID.gdb'
    GRID = 'GRID'
    TRACKSDB = r'...\Track.gdb'
    TRACKS = 'TRACKS'
    OGDB = #Out Dir
    ODB = #Out Geopackage\Geodatabase

    stime = ttime()
    print('Loading datasets at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
    GRIDGDF = read_file(GRIDDB, layer=GRID, Driver='FileGDB')
    GRIDGDF = GRIDGDF[['grid_id', 'geometry']]
    TRACKSGDF = read_file(TRACKSDB, layer=TRACKS, Driver='FileGDB')
    TRACKSGDF = TRACKSGDF[TRACKSGDF.geom_type == 'MultiLineString']

    print('Breaking up big dataset into smaller data pools...')
    x = len(GRIDGDF) // 360000
    if x == 0:
        n = 1
        x = 1
    elif x < 2:
        n = x
    else:
        n = 2

    fgrids = []
    for grid in array_split(GRIDGDF, x):
        df1 = grid_calc(grid, TRACKSGDF.to_crs(grid.crs))
        fgrids.append(df1)
        del df1

    chdir(OGDB)
    if len(fgrids) < 2:
        fgrid = fgrids[0][arrange_cols]
        fgrid.to_file(ODB, layer='FINAL_GRID_COUNTS', driver='GPKG')
    else:
        fgrid = concat(fgrids)[arrange_cols]
        fgrid.to_file(ODB, layer='FINAL_GRID_COUNTS', driver='GPKG')

I'm not sure how this resolves the AssertionError; whether the rewrite fixed a code issue or if this way is handling PC resources better.

MrKingsley
  • 171
  • 1
  • 10