0

I am trying to clean and filter large data I get each month. Recently the data size has grown even larger for a variety of reasons and I can no longer use pandas. I've been attempting to find an alternative and so far Dask has seemed to work until it comes to the export step. My simplified code is:

import pandas as pd
import numpy as np
import dask.dataframe as dd
import dask.array as da
MAP = pd.read_excel('map.csv')
MAP = MAP.from_pandas(MAP,1)
MAP2 = dd.read_csv('map2.csv')
MAP3 = dd.read_CSV('map3.csv')
MAP = dd.merge(
    MAP,
    MAP2,
    how="left",
    left_on=("id1", "id2", "id3"),
    right_on=("id1", "id2", "id3"),
    indicator=False)
MAP = MAP.drop_duplicates(subset=["id1", "id2", "id3",'col1','col2' ])

BIG_DATA = dd.read_csv("BIG_DATA.gz",
                    sep='|',
                    compression='gzip',
                    header=None,
                    blocksize=None,
                    dtype={0: np.int64, 1: np.int64, 4: np.int16, 6: str, 8: np.int64, 9: np.float32, 10: np.float32,
                           11: np.float32, 19: str, 32: str, 37: np.int32, 40: np.float32})

BIG_DATA = pd.merge(
    BIG_DATA,
    MAP3,
    how="left",
    left_on=("id3", "id4"),
    right_on=("id3", "id4"),
    indicator=False)

BIG_DATA = BIG_DATA[filter condition]

groupvars = [g1, g2, g3, g4, g5, g6, g7...g17]
sumlist = [s1, s2, s3, s4]
BIG_DATA = BIG_DATA.groupby(groupvars)[sumlist].sum().reset_index()

BIG_DATA = pd.merge(
    BIG_DATA,
    MAP,
    how="outer",
    left_on=("id1", "id2"),
    right_on=("id1", "id2"),
    indicator=True)
BIG_DATA = BIG_DATA[(BIG_DATA['_merge'].isin(['right_only']) == False)]

BIG_DATA1 = BIG_DATA[filter condition1]
BIG_DATA2 = BIG_DATA[filter condition2]

OUTPUT = pd.concat([BIG_DATA1, BIG_DATA2]).reset_index()

OUTPUT = OUTPUT.repartition(npartitions=100000) #have tried multiple values here
OUTPUT.to_csv(r'\\files\User\test.csv', single_file=True)

When using pandas, this process crashes at the groupby statment. I thought dask might be the way around this, but it seems to always fail when I try to export to csv. I'm new to python and dask, but I'm guessing it is delaying the groupby statement until the export and failing for the same reason as pandas? I've created the same result set using fortran and it results in a 100mb csv file with approximately 600k rows of data. I'm not really sure how to go about changing this so that it will work.

Exact error:

    Traceback (most recent call last):
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\IPython\core\interactiveshell.py", line 3553, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-52-29ce59d87600>", line 350, in <cell line: 350>
    plinePSA.to_csv(r'\\files\User\test.csv', single_file=True, chunksize = 100)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\core.py", line 1699, in to_csv
    return to_csv(self, filename, **kwargs)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\io\csv.py", line 972, in to_csv
    return list(dask.compute(*values, **compute_kwargs))
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\base.py", line 598, in compute
    results = schedule(dsk, keys, **kwargs)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\threaded.py", line 89, in get
    results = get_async(
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\local.py", line 511, in get_async
    raise_exception(exc, tb)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\local.py", line 319, in reraise
    raise exc
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\local.py", line 224, in execute_task
    result = _execute_task(task, data)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\optimization.py", line 990, in __call__
    return core.get(self.dsk, self.outkey, dict(zip(self.inkeys, args)))
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\core.py", line 149, in get
    result = _execute_task(task, cache)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\core.py", line 119, in _execute_task
    return func(*(_execute_task(a, cache) for a in args))
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\io\csv.py", line 129, in __call__
    df = pandas_read_text(
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\dask\dataframe\io\csv.py", line 182, in pandas_read_text
    df = reader(bio, **kwargs)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\util\_decorators.py", line 311, in wrapper
    return func(*args, **kwargs)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\io\parsers\readers.py", line 586, in read_csv
    return _read(filepath_or_buffer, kwds)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\io\parsers\readers.py", line 488, in _read
    return parser.read(nrows)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\io\parsers\readers.py", line 1059, in read
    df = DataFrame(col_dict, columns=columns, index=index)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\frame.py", line 614, in __init__
    mgr = dict_to_mgr(data, index, columns, dtype=dtype, copy=copy, typ=manager)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\construction.py", line 464, in dict_to_mgr
    return arrays_to_mgr(
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\construction.py", line 135, in arrays_to_mgr
    return create_block_manager_from_arrays(
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1773, in create_block_manager_from_arrays
    blocks = _form_blocks(arrays, names, axes, consolidate)
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1838, in _form_blocks
    numeric_blocks = _multi_blockify(
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1928, in _multi_blockify
    values, placement = _stack_arrays(
  File "C:\ProgramData\Anaconda3\envs\pythonProject1\lib\site-packages\pandas\core\internals\managers.py", line 1957, in _stack_arrays
    stacked = np.empty(shape, dtype=dtype)
numpy.core._exceptions._ArrayMemoryError: Unable to allocate 2.11 GiB for an array with shape (3, 189229412) and data type float32
Tyler
  • 75
  • 5
  • is df = dd.merge(df, map) supposed to be df = dd.merge(df, map_df) in your example? – n4321d Aug 18 '22 at 18:22
  • Yes it is. That was just a mistake while simplifying it. – Tyler Aug 18 '22 at 18:53
  • have you tried different block sizes for read_csv as suggested here: https://stackoverflow.com/questions/54459056/dask-memory-error-when-running-df-to-csv? – n4321d Aug 18 '22 at 19:08
  • *I'm guessing it is delaying the groupby statement until the export and failing for the same reason as pandas* < yes - this is definitely what's happening. it's going to be hard for us to debug without a [mre] though :/ the problems I see with your workflow are groupby, reset_index, and then merge. merge requires knowing the full index in order to understand how to schedule the operation, so you need to hold the full file in memory if you have a number of operations which must complete before the index can be known. – Michael Delgado Aug 18 '22 at 20:32
  • also... what are you merging on? the positional index? this isn't well defined in dask. did you mean to provide column names to the merge? – Michael Delgado Aug 18 '22 at 20:33
  • This allowed the export to run, however I had to first go into the network machine and unzip the gzipped source file. While not ideal, I can live with this since it is a once a month issue. The result set it produced is varying when compared to what it should be. Is it possible blocksize might alter the groupby sums? It could just be a an issue with how I adapted the filtering from pandas. – Tyler Aug 18 '22 at 20:45
  • @MichaelDelgado Yes I mean to show that I am merging on certain specific columsn. Post will be edited to make this more clear. I've edited the code to be more similar to my actual workflow, but I cannot provide a sample data set easily since it is highly confidential. – Tyler Aug 18 '22 at 21:05
  • you can always generate fake data. there's nothing about this that is specific to your dataset. see https://stackoverflow.com/questions/20109391/how-to-make-good-reproducible-pandas-examples – Michael Delgado Aug 18 '22 at 21:29

0 Answers0