Action Reading multiple LAZ point cloud files to a Dask DataFrame.
Problem
Unzipping LAZ (compressed) to LAS (uncompressed) requires a lot of memory. Varying filesizes and multiple processes created by Dask result in MemoryError
's.
Attempts
I tried limiting the number of workers following the guide, but it does not seem to work.
from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)
dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/raw', compression='GZIP')
Question How to go about loading such large amount of data in a non-standard format?
Example
Following example is my current implementation. It groups all input files per 5 to limit max 5 parallel uncompressing processes. Then repartitions and write to Parquet to enable further processing. To me this implementation seems to totally miss the point of Dask.
from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed
@delayed
def load(file):
with File(file.as_posix(), mode='r') as las_data:
las_df = pd.DataFrame(las_data.points['point'], dtype=float)
return las_df
meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))
lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))
part_size = 5000000
for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
try:
dfs = [load(file) for file in sublasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')