3

When performing the operation: Dask.dataframe.to_parquet(data), if data was read via Dask with a given number of partitions, and you try to save it in parquet format after having removed some columns, it fails with e.g. the following error:

FileNotFoundError: [Errno 2] No such file or directory: part.0.parquet'

Anyone encountered the same issue?

Here is a minimal example - note that way 1 works as expected, while way 2 does NOT:

import numpy as np
import pandas as pd
import dask.dataframe as dd

# -------------
# way 1 - works
# -------------
print('way 1 - start')
A = np.random.rand(200,300)
cols = np.arange(0, A.shape[1])
cols = [str(col) for col in cols]
df = pd.DataFrame(A, columns=cols)
ddf = dd.from_pandas(df, npartitions=11)

# compute and resave
ddf.drop(cols[0:11], axis=1)
dd.to_parquet(
    ddf, 'error.parquet', engine='auto', compression='default',
    write_index=True, overwrite=True, append=False)
print('way 1 - end')

# ----------------------
# way 2 - does NOT work 
# ----------------------
print('way 2 - start')
ddf = dd.read_parquet('error.parquet')

# compute and resave
ddf.drop(cols[0:11], axis=1)
dd.to_parquet(
    ddf, 'error.parquet', engine='auto', compression='default',
    write_index=True, overwrite=True, append=False)
print('way 2 - end')
SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
GMc
  • 189
  • 1
  • 9

1 Answers1

4

Oh, great, you've spotted a bug due to overwrite=True option. So what happens is when overwrite=True option is set, dask removes the path, see these lines. Now, in your example ddf is lazy, so when it's time to write the data, dask tries to read the files, but they are already gone by now.

So one work-around solution is to save the new dataframe into a different path, then remove the old folder and move the new dataframe's folder to the old folder (some of the options are here).

Another option is to load the ddf in memory (if it fits) and then use your code:

print('way 2 - start')
ddf = dd.read_parquet('error.parquet')

# # compute and persist in memory (note do not use
# # .compute because the dataframe will turn into
# # pandas data frame
ddf = ddf.drop(cols[0:11], axis=1)
ddf = ddf.persist()
dd.to_parquet(
    ddf, 'error.parquet', engine='auto', compression='default',
    write_index=True, overwrite=True, append=False)
# print('way 2 - end')

As a side note, when you run ddf.drop(cols[0:11], axis=1), if you want this to be changed in the dataframe, you will need to assign it:

ddf = ddf.drop(cols[0:11], axis=1)

update: there is some relevant discussion here

SultanOrazbayev
  • 14,900
  • 3
  • 16
  • 46
  • Thank you @SultanOrazbayev, I modified the question and added a minimal example – GMc Mar 25 '21 at 07:24
  • 1
    Thank you, @SultanOrazbayev, that is what I was looking for! And yes, it seems like a bug to me. – GMc Mar 25 '21 at 08:14