3

Action Reading multiple LAZ point cloud files to a Dask DataFrame.

Problem Unzipping LAZ (compressed) to LAS (uncompressed) requires a lot of memory. Varying filesizes and multiple processes created by Dask result in MemoryError's.

Attempts

I tried limiting the number of workers following the guide, but it does not seem to work.

from distributed import Client, LocalCluster
cluster = LocalCluster(n_workers=3)
client = Client(cluster)

dfs = [load(file) for file in lasfiles]
df = dd.from_delayed(dfs, meta=meta)
df = df.repartition(npartitions=len(df) // part_size)
df.to_parquet('/raw', compression='GZIP')

Question How to go about loading such large amount of data in a non-standard format?

Example

Following example is my current implementation. It groups all input files per 5 to limit max 5 parallel uncompressing processes. Then repartitions and write to Parquet to enable further processing. To me this implementation seems to totally miss the point of Dask.

from laspy.file import File
import numpy as np
import pandas as pd
import dask.dataframe as dd
from dask.delayed import delayed

@delayed
def load(file):
    with File(file.as_posix(), mode='r') as las_data:
        las_df = pd.DataFrame(las_data.points['point'], dtype=float)
        return las_df

meta = pd.DataFrame(np.empty(0, dtype=[('X',float),('Y',float),('Z',float),('intensity',float),('raw_classification',int)]))

lasfile_dir = Path('/data/las/')
lasfiles = sorted(list(lasfile_dir.glob('*.laz')))

part_size = 5000000

for idx, sublasfiles in enumerate([lasfiles[i:i+5] for i in range(0,len(lasfiles),5)]):
    try:
        dfs = [load(file) for file in sublasfiles]
        df = dd.from_delayed(dfs, meta=meta)
        df = df.repartition(npartitions=len(df) // part_size)
        df.to_parquet('/data/las/parquet/'+str(idx), compression='GZIP')
Tom Hemmes
  • 2,000
  • 2
  • 17
  • 23
  • I have a similar use-case, can you describe the goal of transferring the lidar data to the parquet format? – bw4sz Mar 20 '18 at 16:55
  • The goal was to be able to read and write the data quickly during processing. The memory limitations for reading LAZ (as described in this question) are no issue with the Parquet format. – Tom Hemmes Mar 22 '18 at 08:26

1 Answers1

1

Your implementation seems mostly fine to me.

The one thing I would change here is that I would avoid the call to len(df), which will force a computation of the entire dataframe (there is no way to determine the length of the dataframe without reading through all of the files).

Just to be clear, Dask will not be able to parallelize within your load function (it has no concept of LAZ files), so your parallelism will be limited by the number of files that you have.

MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • Thank you for your help! I will try to remove the `len(df)` in my current implementation and account for the largest file. However, I don't yet understand why I cannot limit the amount of workers to prevent the default 8 workers all loading a LAZ, given that `8 * LAZ > Memory`. I would prefer using the implementation as attempted. – Tom Hemmes Dec 06 '17 at 13:01
  • "However, I don't yet understand why I cannot limit the amount of workers to prevent the default 8 workers all loading a LAZ," I don't understand what you mean by this. – MRocklin Dec 06 '17 at 14:44
  • 1
    When I run the code as previewed in **attempt** (not the example) I can see 8 running processes of LAZ unzipping. I expected this to be 3 processes at maximum, because I set `n_workers = 3`. However, I just discovered I have to set it to `threads` and limit those as well. – Tom Hemmes Dec 07 '17 at 08:40