2

The following code is to read a simple .csv file with four columns with string values and a header row. Then another column is added to the frame that takes each row in the 'posted' column (a date string) and provides the appropriate day of the week for each row. However the code throws a 'not implemented' error and does not seem to be recognizing the data types even though they are defined in the function parameters (see error message below).

I have tried Dataframe.read_csv both specifying and not specifying the column data types but get the same error. The line where the error takes place is a list comprehension but I have the same error with a loop. The data frames seem correct when I print them out but the data types are all 'object' which is not correct.

'NotImplemented' seems to mean the Dataframe is being changed but since all operations are on an individual frame where is Dask seeing a non-Series object?

from dask import delayed, compute, visualize, dataframe

...

def treat(frame):
    frame["day"] = [pd.Timestamp(value) for value in frame.posted]
    print(frame.columns)
    return frame

def find_files():
...

def construct_frames(files):
    dataframes = []
    # choose 3 of all the files
    selection = [files[random.randrange(len(files) - 1)] for i in range(1,4)]
    for pair in selection:
        key = pair[0]
        file = pair[1]
        path = os.path.join(TOP_DIR + "/engagement_id=" + key + "/" + file)
        data = dataframe.read_csv(path,
                                  dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
        print(data.columns, data.head())
        treat(data)
        dataframes.append(data)
    return dataframes

files = find_files()
dataframes = construct_frames(files)
visualize(dataframes)

Output (in Jupyter):

Dask DataFrame Structure:
                   id data_import_id  posted  amount
npartitions=1                                       
               object         object  object  object
                  ...            ...     ...     ...
Dask Name: from-delayed, 3 tasks
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
<ipython-input-8-e30d04e9aed0> in <module>
     47 
     48 files = find_files()
---> 49 dataframes = construct_frames(files)
     50 
     51 

<ipython-input-8-e30d04e9aed0> in construct_frames(files)
     42                                   dtype={"id":str,"data_import_id": str, "posted": str, "amount": str})
     43         print(data)
---> 44         treat(data)
     45         dataframes.append(data)
     46     return dataframes

<ipython-input-8-e30d04e9aed0> in treat(frame)
     15 
     16 def treat(frame):
---> 17     frame["day"] = [pd.Timestamp(value) for value in frame.posted]
     18     print(frame.columns)
     19     return frame

<ipython-input-8-e30d04e9aed0> in <listcomp>(.0)
     15 
     16 def treat(frame):
---> 17     frame["day"] = [pd.Timestamp(value) for value in frame.posted]
     18     print(frame.columns)
     19     return frame

/anaconda3/envs/dask-tutorial/lib/python3.6/site-packages/dask/dataframe/core.py in __getitem__(self, key)
   2059             return Series(graph, name, self._meta, self.divisions)
   2060         raise NotImplementedError(
-> 2061             "Series getitem in only supported for other series objects "
   2062             "with matching partition structure"
   2063         )

NotImplementedError: Series getitem in only supported for other series objects with matching partition structure

Data looks kind of like this ie alpha numeric strings and a date string that gets converted to a 'day' in a new column:

id  data_import_id  posted  amount
00000000  3c221ff  2014-01-02T19:00:00.000-05:00  3656506
00000013  3c221ff  2014-01-03T19:00:00.000-05:00  3656506
00000015  3c221ff  2014-01-04T19:00:00.000-05:00  3656506
0000000a  3c221ff  2014-01-05T19:00:00.000-05:00  3656506
00000001  3c221ff  2014-01-06T19:00:00.000-05:00  3656506
edesz
  • 11,756
  • 22
  • 75
  • 123
MikeB2019x
  • 823
  • 8
  • 23
  • Could you post a sample of the `.csv` file (without posting any private data) - maybe the first 5 rows? Also, did you try using pandas to read the first 5 rows `data = pd.read_csv(path, dtype={...}, nrows=5)` to see if Pandas can read it? – edesz Apr 30 '19 at 02:23
  • 1
    See edit. Pandas can read it fine. I'm wondering if it is because I'm trying to add a column? – MikeB2019x Apr 30 '19 at 02:54
  • Yes, I agree. It's because of how the column is being added - that functionality is not available in `dask`. – edesz Apr 30 '19 at 13:08

1 Answers1

3

I got the error at this line

frame["day"] = [pd.Timestamp(value) for value in frame.posted]

Turns out there are a few possibilities to append a column to a dask DataFrame

  • these approaches assume that timezone information is not important
  • if the timezone is important, then please see the comment by @MikeB2019x here for how to take this into account

Using map_partitions (per this SO post)

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        dtype={"id":str,"data_import_id": str, "amount": str})
meta = ('posted', 'datetime64[ns]')
ddf['posted'] = ddf.posted.map_partitions(pd.to_datetime, meta=meta)
ddf = treat(ddf)

print(ddf.head())

         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                        object
data_import_id            object
posted            datetime64[ns]
amount                    object
day_of_week                int64
weekday                   object
dtype: object

Using .to_datetime (per this SO post)

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        dtype={"id":str,"data_import_id": str, "amount": str})
ddf['posted']=dataframe.to_datetime(ddf.posted, format="%Y%m%d %H:%M:%S") # option 1
# ddf['posted']=dataframe.to_datetime(ddf.posted, unit='ns') # option 2
ddf = treat(ddf)

print(ddf.head())
         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                        object
data_import_id            object
posted            datetime64[ns]
amount                    object
day_of_week                int64
weekday                   object
dtype: object

Or, just specify the parse_dates argument to .read_csv

ddf = dataframe.read_csv('test.csv',
                        delimiter="  ",
                        engine='python',
                        parse_dates=['posted'],
                        dtype={"id":str,"data_import_id": str, "amount": str})
ddf = treat(ddf)

print(ddf.head())
         id data_import_id                    posted   amount  day_of_week   weekday
0  00000000        3c221ff 2014-01-02 19:00:00-05:00  3656506            2  Thursday
1  00000013        3c221ff 2014-01-03 19:00:00-05:00  3656506            3    Friday
2  00000015        3c221ff 2014-01-04 19:00:00-05:00  3656506            4  Saturday
3  0000000a        3c221ff 2014-01-05 19:00:00-05:00  3656506            5    Sunday
4  00000001        3c221ff 2014-01-06 19:00:00-05:00  3656506            6    Monday

print(ddf.dtypes)
id                                                object
data_import_id                                    object
posted            datetime64[ns, pytz.FixedOffset(-300)]
amount                                            object
day_of_week                                        int64
weekday                                           object
dtype: object

BTW, datetime attributes (.dt datetime namespace) can be used on a dask series similarly to pandas - see here

def treat(frame):
    frame['day_of_week'] = frame['posted'].dt.day
    frame['weekday'] = frame['posted'].dt.weekday_name
    return frame
edesz
  • 11,756
  • 22
  • 75
  • 123
  • 1
    Tnx! Note that in my case the timezone information is important so the second approach (first option) works in my case if I set 'utc=True' in the 'dataframe.to_datetime' function. For the last approach to work you have to include: ' date_parser=lambda col: pd.to_datetime(col, utc=True), ' as an attribute in the '.read_csv' basically specify the date parser [link](https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html?highlight=utc) – MikeB2019x Apr 30 '19 at 15:28
  • Thanks for the info about this @MikeB2019x. I have edited the answer to point to your comment about this. – edesz Apr 30 '19 at 15:40