I got an HDF-store with a single dataframe of 60089424 rows x 5 columns. All columns are categorical, and they all contain the same mapping of values to codes, which has 3318655 categories. The HDF-store is compressed with complevel=9
and complib='blosc:lz4'
. It is written with format='table'
. The file is 1.1 Gigs on disk.
I am mentioning all this because I don't know if it is relevant to my question.
I now wanted to drop rows from the dataframe that contain values that have a frequency below a certain threshold and I wanted to do this in a reasonable amount of time. I asked a question about that here.
I eventually managed to construct code that seems to do the job in a reasonable amount of time using Dask (if this question here can be solved I will answer my own question over there with that code). There is one big issue though. The code does not terminate. It runs fine until the compute()
call. It also finishes compute()
, as indicated by the completed progress bar. But it holds there. I never executes the next line (print(f'{time_to_display()}: dropped rows with low frequency words')
.
Now... I really am at a loss as to why that is. I am almost literally pulling my hair out in fact. Can you explain to me why the compute()
-call displays as completed yet does not finish?
import time
import dask.dataframe as ddf
import dask
import numpy as np
import pandas as pd
def time_to_display():
return time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
hdfstore_filename = '<the HDF file>'
threshold = 100
print(f'{time_to_display()}: filtering data for low frequency words')
print(f'file = {hdfstore_filename}.h5')
store = pd.HDFStore(f'{hdfstore_filename}.h5')
df = store.select(key='/ngrams')
print(f'{time_to_display()}: data loaded')
unique, counts = np.unique(df.values.ravel(), return_counts=True)
print(f'{time_to_display()}: gathered frequencies of words [1]')
d = dict(zip(unique, counts))
print(f'{time_to_display()}: gathered frequencies of words [2]')
to_remove = [k for k, v in d.items() if v < threshold]
print(f'{time_to_display()}: gathered indices of values to remove')
df_dask = ddf.from_pandas(df, chunksize=1000000) # results in 60 chunks for this data
print(f'{time_to_display()}: df_dask created')
mask = df_dask.isin(to_remove)
print(f'{time_to_display()}: mask created')
column_mask = (~mask).all(axis=1)
print(f'{time_to_display()}: column_mask created')
df_dask = df_dask[column_mask]
print(f'{time_to_display()}: df_dask filtered')
df_dask.visualize(filename='log/df_dask', format='pdf')
print(f'{time_to_display()}: computation graph rendered')
from dask.diagnostics import ProgressBar
with ProgressBar():
df_out = dask.compute(df_dask)[0]
print(f'{time_to_display()}: dropped rows with low frequency words')
df_out.to_hdf(f'{hdfstore_filename}_filtered_complete.h5', 'ngrams', complevel=9,
complib='blosc:lz4', format='table')
print(f'{time_to_display()}: store written')
store.close()
I added the progress messages to find out how long the individual calls take. The output is:
2018-12-04 22:35:46: filtering data for low frequency words
file = data/5grams_wiki_00/5grams_wiki_00_cat_final.h5
2018-12-04 22:36:31: data loaded
2018-12-04 22:45:33: gathered frequencies of words [1]
2018-12-04 22:45:34: gathered frequencies of words [2]
2018-12-04 22:45:35: gathered indices of values to remove
2018-12-04 22:45:52: df_dask created
2018-12-04 22:46:12: mask created
2018-12-04 22:46:12: column_mask created
2018-12-04 22:46:12: df_dask filtered
2018-12-04 22:46:13: computation graph rendered
[########################################] | 100% Completed | 12min 2.4s
I let the program run another 30 minutes, but no new messages appeared.
edit
I ran this now with a version of the same dataset with one difference: The columns all had their own distinct catgorical encoding. This time the time difference between the progress bar reporting complete and the next progress message being printed was "only" 15 minutes. So the crux is the categorical encoding. Any idea why?
2018-12-05 13:00:46: filtering data for low frequency words
file = data/5grams_wiki_00/5grams_wiki_00_cat.h5
2018-12-05 13:01:34: data loaded
2018-12-05 13:11:00: gathered frequencies of words [1]
2018-12-05 13:11:01: gathered frequencies of words [2]
2018-12-05 13:11:02: gathered indices of values to remove
2018-12-05 13:11:19: df_dask created
2018-12-05 13:11:31: mask created
2018-12-05 13:11:31: column_mask created
2018-12-05 13:11:31: df_dask filtered
2018-12-05 13:11:31: computation graph rendered
[########################################] | 100% Completed | 5min 52.3s
2018-12-05 13:28:49: dropped rows with low frequency words
2018-12-05 13:31:05: store written