I am trying to clean and filter large data I get each month. Recently the data size has grown even larger for a variety of reasons and I can no longer use pandas. I've been attempting to find an alternative and so far Dask has seemed to work until it comes to the export step. My simplified code is:
import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
MAP = pd.read_excel('map.csv')
MAP = MAP.from_pandas(MAP,1)
MAP2 = dd.read_csv('map2.csv')
MAP3 = dd.read_CSV('map3.csv')
MAP = dd.merge(
MAP,
MAP2,
how="left",
left_on=("id1", "id2", "id3"),
right_on=("id1", "id2", "id3"),
indicator=False)
MAP = MAP.drop_duplicates(subset=["id1", "id2", "id3",'col1','col2' ])
BIG_DATA = dd.read_csv("BIG_DATA.gz",
sep='|',
compression='gzip',
header=None,
blocksize=None,
dtype={0: np.int64, 1: np.int64, 4: np.int16, 6: str, 8: np.int64, 9: np.float32, 10: np.float32,
11: np.float32, 19: str, 32: str, 37: np.int32, 40: np.float32})
BIG_DATA = pd.merge(
BIG_DATA,
MAP3,
how="left",
left_on=("id3", "id4"),
right_on=("id3", "id4"),
indicator=False)
BIG_DATA = BIG_DATA[filter condition]
groupvars = [g1, g2, g3, g4, g5, g6, g7...g17]
sumlist = [s1, s2, s3, s4]
BIG_DATA = BIG_DATA.groupby(groupvars)[sumlist].sum().reset_index()
BIG_DATA = pd.merge(
BIG_DATA,
MAP,
how="outer",
left_on=("id1", "id2"),
right_on=("id1", "id2"),
indicator=True)
BIG_DATA = BIG_DATA[(BIG_DATA['_merge'].isin(['right_only']) == False)]
BIG_DATA1 = BIG_DATA[filter condition1]
BIG_DATA2 = BIG_DATA[filter condition2]
OUTPUT = pd.concat([BIG_DATA1, BIG_DATA2]).reset_index()
OUTPUT = OUTPUT.repartition(npartitions=100000) #have tried multiple values here
OUTPUT.to_csv(r'\\files\User\test.csv', single_file=True)
When using pandas, this process crashes at the groupby statment. I thought dask might be the way around this, but it seems to always fail when I try to export to csv. I'm new to python and dask, but I'm guessing it is delaying the groupby statement until the export and failing for the same reason as pandas? I've created the same result set using fortran and it results in a 100mb csv file with approximately 600k rows of data. I'm not really sure how to go about changing this so that it will work.
Exact error:
Traceback (most recent call last):
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\IPython\core\interactiveshell.py", line 3553, in run_code
exec(code_obj, self.user_global_ns, self.user_ns)
File "<ipython-input-52-29ce59d87600>", line 350, in <cell line: 350>
plinePSA.to_csv(r'\\files\User\test.csv', single_file=True, chunksize = 100)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\core.py", line 1699, in to_csv
return to_csv(self, filename, **kwargs)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\io\csv.py", line 972, in to_csv
return list(dask.compute(*values, **compute_kwargs))
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\base.py", line 598, in compute
results = schedule(dsk, keys, **kwargs)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\threaded.py", line 89, in get
results = get_async(
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\local.py", line 511, in get_async
raise_exception(exc, tb)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\local.py", line 319, in reraise
raise exc
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\local.py", line 224, in execute_task
result = _execute_task(task, data)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\optimization.py", line 990, in __call__
return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\core.py", line 149, in get
result = _execute_task(task, cache)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\core.py", line 119, in _execute_task
return func(*(_execute_task(a, cache) for a in args))
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\io\csv.py", line 129, in __call__
df = pandas_read_text(
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\io\csv.py", line 182, in pandas_read_text
df = reader(bio, **kwargs)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\util\_decorators.py", line 311, in wrapper
return func(*args, **kwargs)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\io\parsers\readers.py", line 586, in read_csv
return _read(filepath_or_buffer, kwds)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\io\parsers\readers.py", line 488, in _read
return parser.read(nrows)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\io\parsers\readers.py", line 1059, in read
df = DataFrame(col_dict, columns=columns, index=index)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\frame.py", line 614, in __init__
mgr = dict_to_mgr(data, index, columns, dtype=dtype, copy=copy, typ=manager)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\construction.py", line 464, in dict_to_mgr
return arrays_to_mgr(
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\construction.py", line 135, in arrays_to_mgr
return create_block_manager_from_arrays(
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1773, in create_block_manager_from_arrays
blocks = _form_blocks(arrays, names, axes, consolidate)
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1838, in _form_blocks
numeric_blocks = _multi_blockify(
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1928, in _multi_blockify
values, placement = _stack_arrays(
File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1957, in _stack_arrays
stacked = np.empty(shape, dtype=dtype)
numpy.core._exceptions._ArrayMemoryError: Unable to allocate 2.11 GiB for an array with shape (3, 189229412) and data type float32