0

I'm attempting to merge two large dataframes (one 50k+ values, and another with 650k+ values - pared down from 7M+). Merging/matching is being done via fuzzywuzzy, to find which string in the first dataframe matches which string in the other most closely.

At the moment, it takes about 3 minutes to test 100 rows for variables. Consequently, I'm attempting to institute Dask to help with the processing speed. In doing so, Dask returns the following error: 'NotImplementedError: Series getitem in only supported for other series objects with matching partition structure'

Presumably, the error is due to my dataframes not being of equal size. In trying to set a chunksize when converting the my pandas dataframes to dask dataframes, I receive an error (TypeError: 'float' object cannot be interpreted as an integer) even though I previous forced all my datatypes in each dataframe to 'objects'. Consequently, I was forced to use the npartitions parameter in the dataframe conversion, which then leads to the 'NotImplementedError' above.

I've tried to standardize the chunksize with partitions with a mathematical index, and also tried using the npartitions parameter to no effect, and resulting in the same NotImplementedError.

As mentioned my efforts to utilize this without Dask have been successful, but far too slow to be useful.

I've also taken a look at these questions/responses: - Different error - No solution presented - Seems promising, but results are still slow

'''' aprices_filtered_ddf = dd.from_pandas(prices_filtered, chunksize = 25e6) #prices_filtered: 404.2MB all_data_ddf = dd.from_pandas(all_data, chunksize = 25e6) #all_data: 88.7MB

# import dask
client = Client()
dask.config.set(scheduler='processes')

# Define matching function
def match_name(name, list_names, min_score=0):
    # -1 score incase we don't get any matches max_score = -1
    # Returning empty name for no match as well
    max_name = ""
    # Iterating over all names in the other
    for name2 in list_names:
        #Finding fuzzy match score
        score = fuzz.token_set_ratio(name, name2)
        # Checking if we are above our threshold and have a better score
        if (score > min_score) & (score > max_score):
            max_name = name2
            max_score = score
    return (max_name, max_score)

# List for dicts for easy dataframe creation
dict_list = []
# iterating over our players without salaries found above for name in prices_filtered_ddf['ndc_description'][:100]:
    # Use our method to find best match, we can set a threshold here
    match = client(match_name(name, all_data_ddf['ndc_description_agg'], 80))

    # New dict for storing data
    dict_ = {}
    dict_.update({'ndc_description_agg' : name})
    dict_.update({'ndc_description' : match[0]})
    dict_.update({'score' : match[1]})
    dict_list.append(dict_)

merge_table = pd.DataFrame(dict_list)
# Display results
merge_table

Here's the full error:

NotImplementedError                       Traceback (most recent call last)
<ipython-input-219-e8d4dcb63d89> in <module>
      3 dict_list = []
      4 # iterating over our players without salaries found above
----> 5 for name in prices_filtered_ddf['ndc_description'][:100]:
      6     # Use our method to find best match, we can set a threshold here
      7     match = client(match_name(name, all_data_ddf['ndc_description_agg'], 80))

C:\Anaconda\lib\site-packages\dask\dataframe\core.py in __getitem__(self, key)
   2671             return Series(graph, name, self._meta, self.divisions)
   2672         raise NotImplementedError(
-> 2673             "Series getitem in only supported for other series objects "
   2674             "with matching partition structure"
   2675         )

NotImplementedError: Series getitem in only supported for other series objects with matching partition structure

''''

I expect that the merge_table will return, in a relatively short time, a dataframe with data for each of the update columns. At the moment, it's extremely slow.

alofgran
  • 427
  • 7
  • 18

1 Answers1

1

I'm afraid there are a number of problems with this question, so after pointing these out, I can only provide some general guidance.

  • The traceback shown is clearly not produced by the code above
  • The indentation and syntax are broken
  • A distributed client is made, then config set not to use it ("processes" is not the distributed scheduler)
  • The client object is called, client(...), but it is not callable, this shouldn't work at all
  • The main processing function, match_name is called directly; how do you expect Dask to intervene?
  • You don't ever call compute(), so in the code given, I'm not sure Dask is invoked at all.

What you actually want to do:

  • Load your smaller, reference dataframe using pandas, and call client.scatter to make sure all the workers have it
  • Load your main data with dd.read_csv
  • Call df.map_partitions(..) to process your data, where the function you pass should take two pandas dataframes, and work row-by-row.
mdurant
  • 27,272
  • 5
  • 45
  • 74
  • Thanks @mdurant. This is one of those situations where nothing had worked after trying a bunch of different things, so you got my last attempt. I tried client.scatter, converted pd dataframes to dd (but if it's better to read them straight into dask, that'd be helpful to know), and called compute within the client(match_name...) phrase. I'd tried enough variations it's difficult to list them all - nothing that's your fault That being said, your answer does give me some places to start, though, particularly with 'config set' and map_partitions. I'll update this when I run those changes. – alofgran Jul 30 '19 at 21:39