The following code is to read a simple .csv file with four columns with string values and a header row. Then another column is added to the frame that takes each row in the 'posted' column (a date string) and provides the appropriate day of the week for each row. However the code throws a 'not implemented' error and does not seem to be recognizing the data types even though they are defined in the function parameters (see error message below).
I have tried Dataframe.read_csv both specifying and not specifying the column data types but get the same error. The line where the error takes place is a list comprehension but I have the same error with a loop. The data frames seem correct when I print them out but the data types are all 'object' which is not correct.
'NotImplemented' seems to mean the Dataframe is being changed but since all operations are on an individual frame where is Dask seeing a non-Series object?
from dask import delayed, compute, visualize, dataframe
...
def treat(frame):
frame["day"] = [pd.Timestamp(value) for value in frame.posted]
print(frame.columns)
return frame
def find_files():
...
def construct_frames(files):
dataframes = []
# choose 3 of all the files
selection = [files[random.randrange(len(files) - 1)] for i in range(1,4)]
for pair in selection:
key = pair[0]
file = pair[1]
path = os.path.join(TOP_DIR + "/engagement_id=" + key + "/" + file)
data = dataframe.read_csv(path,
dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
print(data.columns, data.head())
treat(data)
dataframes.append(data)
return dataframes
files = find_files()
dataframes = construct_frames(files)
visualize(dataframes)
Output (in Jupyter):
Dask DataFrame Structure:
id data_import_id posted amount
npartitions=1
object object object object
... ... ... ...
Dask Name: from-delayed, 3 tasks
---------------------------------------------------------------------------
NotImplementedError Traceback (most recent call last)
<ipython-input-8-e30d04e9aed0> in <module>
47
48 files = find_files()
---> 49 dataframes = construct_frames(files)
50
51
<ipython-input-8-e30d04e9aed0> in construct_frames(files)
42 dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
43 print(data)
---> 44 treat(data)
45 dataframes.append(data)
46 return dataframes
<ipython-input-8-e30d04e9aed0> in treat(frame)
15
16 def treat(frame):
---> 17 frame["day"] = [pd.Timestamp(value) for value in frame.posted]
18 print(frame.columns)
19 return frame
<ipython-input-8-e30d04e9aed0> in <listcomp>(.0)
15
16 def treat(frame):
---> 17 frame["day"] = [pd.Timestamp(value) for value in frame.posted]
18 print(frame.columns)
19 return frame
/anaconda3/envs/dask-tutorial/lib/python3.6/site-packages/dask/dataframe/core.py in __getitem__(self, key)
2059 return Series(graph, name, self._meta, self.divisions)
2060 raise NotImplementedError(
-> 2061 "Series getitem in only supported for other series objects "
2062 "with matching partition structure"
2063 )
NotImplementedError: Series getitem in only supported for other series objects with matching partition structure
Data looks kind of like this ie alpha numeric strings and a date string that gets converted to a 'day' in a new column:
id data_import_id posted amount
00000000 3c221ff 2014-01-02T19:00:00.000-05:00 3656506
00000013 3c221ff 2014-01-03T19:00:00.000-05:00 3656506
00000015 3c221ff 2014-01-04T19:00:00.000-05:00 3656506
0000000a 3c221ff 2014-01-05T19:00:00.000-05:00 3656506
00000001 3c221ff 2014-01-06T19:00:00.000-05:00 3656506