I have thousands of csv files, which, using dask, I have repartitioned and converted to parquet using dask. So, I have a parquet file with 100 partitions, but now I want to read that parquet file in and write out one parquet file per symbol (stock data).
This post Dask dataframe split partitions based on a column or function made me think that setting the index was the right thing to do.
Setup
I'm running this on an aws m5.24xlarge instance as I couldn't get a cluster to work (another post I'll have to make), and I"m using Jupyter Lab through an ssh tunnel. Everything is a very recent install:
dask 2021.8.0 pyhd3eb1b0_0
dask-core 2021.8.0 pyhd3eb1b0_0
distributed 2021.8.0 py39h06a4308_0
pandas 1.3.1 py39h8c16a72_0
python 3.9.6 h12debd9_0
My code essentially is this:
import s3fs
import pandas as pd
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client
client = Client(n_workers=48, threads_per_worker=1, processes=True)
client
PARQUET_WORKING = '../parquet-work/'
TEST_PARQUET = PARQUET_WORKING + '/new_options_parquet/new_option_data_2017.parquet.brotli'
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet = test_parquet.set_index('UnderlyingSymbol')
test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
If I check test_parquet.npartitions
I will get 100. Additionally, there are 4702 unique symbols in the UnderlyingSymbol
column. When I run the above code I get:
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker process still alive after 3 seconds, killing
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
---------------------------------------------------------------------------
ValueError Traceback (most recent call last)
<ipython-input-4-814095686328> in <module>
4 test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
5 test_parquet = test_parquet.set_index('UnderlyingSymbol')
----> 6 test_parquet.to_parquet(PARQUET_WORKING + 'test_index_write.parquet.snappy', compression='snappy', engine='pyarrow')
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/core.py in to_parquet(self, path, *args, **kwargs)
4438 from .io import to_parquet
4439
-> 4440 return to_parquet(self, path, *args, **kwargs)
4441
4442 def to_orc(self, path, *args, **kwargs):
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/dataframe/io/parquet/core.py in to_parquet(df, path, engine, compression, write_index, append, overwrite, ignore_divisions, partition_on, storage_options, custom_metadata, write_metadata_file, compute, compute_kwargs, schema, **kwargs)
717 if compute:
718 if write_metadata_file:
--> 719 return compute_as_if_collection(
720 DataFrame, graph, (final_name, 0), **compute_kwargs
721 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/dask/base.py in compute_as_if_collection(cls, dsk, keys, scheduler, get, **kwargs)
311 schedule = get_scheduler(scheduler=scheduler, cls=cls, get=get)
312 dsk2 = optimization_function(cls)(dsk, keys, **kwargs)
--> 313 return schedule(dsk2, keys, **kwargs)
314
315
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in get(self, dsk, keys, workers, allow_other_workers, resources, sync, asynchronous, direct, retries, priority, fifo_timeout, actors, **kwargs)
2669 should_rejoin = False
2670 try:
-> 2671 results = self.gather(packed, asynchronous=asynchronous, direct=direct)
2672 finally:
2673 for f in futures.values():
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in gather(self, futures, errors, direct, asynchronous)
1946 else:
1947 local_worker = None
-> 1948 return self.sync(
1949 self._gather,
1950 futures,
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in sync(self, func, asynchronous, callback_timeout, *args, **kwargs)
843 return future
844 else:
--> 845 return sync(
846 self.loop, func, *args, callback_timeout=callback_timeout, **kwargs
847 )
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in sync(loop, func, callback_timeout, *args, **kwargs)
323 if error[0]:
324 typ, exc, tb = error[0]
--> 325 raise exc.with_traceback(tb)
326 else:
327 return result[0]
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/utils.py in f()
306 if callback_timeout is not None:
307 future = asyncio.wait_for(future, callback_timeout)
--> 308 result[0] = yield future
309 except Exception:
310 error[0] = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/tornado/gen.py in run(self)
760
761 try:
--> 762 value = future.result()
763 except Exception:
764 exc_info = sys.exc_info()
~/miniconda3/envs/ds2/lib/python3.9/site-packages/distributed/client.py in _gather(self, futures, errors, direct, local_worker)
1811 exc = CancelledError(key)
1812 else:
-> 1813 raise exception.with_traceback(traceback)
1814 raise exc
1815 if errors == "skip":
ValueError: Could not find dependent ('group-shuffle-0-2eb6f1e40148076067c9f27b831be488', (5, 2)). Check worker logs
distributed.nanny - WARNING - Restarting worker
distributed.nanny - WARNING - Worker exceeded 95% memory budget. Restarting
distributed.nanny - WARNING - Restarting worker
I am not sure where to check "worker logs".
This feels like something fairly simple that should just "work" yet I have spent a lot of time on it so I must be doing something wrong.
Additionally, I have tried this:
test_parquet = dd.read_parquet(TEST_PARQUET, engine='pyarrow')
test_parquet.to_parquet(PARQUET_WORKING + 'test_2017_symbol.parquet.brotli',
compression='brotli',
partition_on='UnderlyingSymbol')
And, I basically get the desired result, except that each of the resulting files has 100 partitions and they are now small enough I'd prefer a single partition, which is why I am trying the set_index
method above, but now I want to know what the "right" way to do this is.