I am trying to calculate stats on features that fall within a cell, because the dataframes are quite large I am using multiprocessing to speed up the process. When I run the code on a sample dataset of only 1000 grid cells I get expected results; when I try to run the stats for full dataset, (4.3 millon cells+-), I get AssertionError. Looking into this it seems that it is due to trying to pass more than 2Gb of data from the worker to the main process which makes sense, one of my dataframes is 5gb on disk. Apparently this is a bug that has been fixed so I moved to python 3.9 in OSGeo4W but got the same error. I then tried to break it down into smaller datasets with array_split and a loop but still no luck. How can I do this rather large operation? I tried using gp.overlay but got the same assertion errors. Sjoin returns a count of each intersecting feature and I was able to filter and sjoin but some tracks cross the cell multiple times and I need a count of each crossing, hence I use geopandas overlay and clip. I also tried pandarallel but it kept returning the error clip not defined.
from pandas import concat
from pandarallel import pandarallel
from geopandas import read_file, clip
from time import time as ttime, strftime, gmtime
from datetime import timedelta
from calendar import month_name
from functools import partial
from os import chdir, path, remove
from multiprocessing import Pool
from numpy import array_split
def cell_calc(x, y):
df = clip(y.to_crs(x.crs), x, keep_geom_type=True).explode()
cell = x
TList = ['CARGO', 'FISHING', 'OTHER', 'PASSENGER', 'TANKER']
MList = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
cell['ALL'] = len(df)
for type in TList:
cell[type] = len(df[df['MYTYPE']==type])
for month in MList:
mdf = df[df['MONTH']==month]
cell[month_name[month]+'_ALL'] = len(mdf)
for type in TList:
cell[month_name[month]+'_'+type] = len(mdf[mdf['MYTYPE'] == type])
return cell
if __name__ == '__main__':
GRIDDB = r'...\GRID.gdb'
GRID = 'GRID102001'
TRACKSDB = r'...\ShipTracks.gdb'
TRACKS = 'ShipTracks_clip'
OGDB = r'...\OutDir'
ODB = '2020.gpkg'
stime = ttime()
print('Loading datasets at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
GRIDGDF = read_file(GRID)
GRIDGDF = GRIDGDF[['grid_id', 'geometry']]
TRACKSGDF = read_file(TRACKSDB, layer=TRACKS, Driver='FileGDB', rows=1000)
TRACKSGDF = TRACKSGDF[['MYTYPE', 'MONTH', 'geometry']]
stime = ttime()
print('Computing cell statistics ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
func = partial(cell_calc, y=TRACKSGDF)
p = Pool(processes=16)
split_dfs = array_split(GRIDGDF, 16)
pool_results = p.map(func, split_dfs)
p.close()
p.join()
grid = concat(pool_results)
etime = ttime()
print('Cell statistics completed at ' + str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
(timedelta(seconds=etime - stime))))
chdir(OGDB)
grid.to_file(ODB, layer='AIS_GRID', driver='GPKG')
Here is the error:
AssertionError
Process SpawnPoolWorker-31:
Traceback (most recent call last):
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 315, in _bootstrap
self.run()
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\process.py", line 108, in run
self._target(*self._args, **self._kwargs)
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\pool.py", line 114, in worker
task = get()
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\queues.py", line 366, in get
res = self._reader.recv_bytes()
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 221, in recv_bytes
buf = self._recv_bytes(maxlength)
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 323, in _recv_bytes
return self._get_more_data(ov, maxsize)
File "...\QGIS 3.22.0\apps\Python39\lib\multiprocessing\connection.py", line 342, in _get_more_data
assert left > 0
I added this loop to try and break the data up even more...
for grid in array_split(GRIDGDF, 10):
count = count+1
tracks = clip(TRACKSGDF, grid)
print('Computing cell statistics for '+str(count)+' of 10 at ' + str(strftime('%H:%M:%S', gmtime(stime - 14400))))
func = partial(cell_calc, y=tracks)
p = Pool(processes=16)
split_dfs = array_split(grid, 16)
pool_results = p.map(func, split_dfs)
p.close()
p.join()
grid = concat(pool_results, axis=0)
etime = ttime()
print('Cell statistics completed for '+str(count)+' of 10 at '+ str(strftime('%H:%M:%S', gmtime(ttime() - 14400))) + ' in ' + str(
(timedelta(seconds=etime - stime))))
results.append(grid)