I'm attempting to merge two large dataframes (one 50k+ values, and another with 650k+ values - pared down from 7M+). Merging/matching is being done via fuzzywuzzy, to find which string in the first dataframe matches which string in the other most closely.
At the moment, it takes about 3 minutes to test 100 rows for variables. Consequently, I'm attempting to institute Dask to help with the processing speed. In doing so, Dask returns the following error: 'NotImplementedError: Series getitem in only supported for other series objects with matching partition structure'
Presumably, the error is due to my dataframes not being of equal size. In trying to set a chunksize when converting the my pandas dataframes to dask dataframes, I receive an error (TypeError: 'float' object cannot be interpreted as an integer) even though I previous forced all my datatypes in each dataframe to 'objects'. Consequently, I was forced to use the npartitions parameter in the dataframe conversion, which then leads to the 'NotImplementedError' above.
I've tried to standardize the chunksize with partitions with a mathematical index, and also tried using the npartitions parameter to no effect, and resulting in the same NotImplementedError.
As mentioned my efforts to utilize this without Dask have been successful, but far too slow to be useful.
I've also taken a look at these questions/responses: - Different error - No solution presented - Seems promising, but results are still slow
'''' aprices_filtered_ddf = dd.from_pandas(prices_filtered, chunksize = 25e6) #prices_filtered: 404.2MB all_data_ddf = dd.from_pandas(all_data, chunksize = 25e6) #all_data: 88.7MB
# import dask
client = Client()
dask.config.set(scheduler='processes')
# Define matching function
def match_name(name, list_names, min_score=0):
# -1 score incase we don't get any matches max_score = -1
# Returning empty name for no match as well
max_name = ""
# Iterating over all names in the other
for name2 in list_names:
#Finding fuzzy match score
score = fuzz.token_set_ratio(name, name2)
# Checking if we are above our threshold and have a better score
if (score > min_score) & (score > max_score):
max_name = name2
max_score = score
return (max_name, max_score)
# List for dicts for easy dataframe creation
dict_list = []
# iterating over our players without salaries found above for name in prices_filtered_ddf['ndc_description'][:100]:
# Use our method to find best match, we can set a threshold here
match = client(match_name(name, all_data_ddf['ndc_description_agg'], 80))
# New dict for storing data
dict_ = {}
dict_.update({'ndc_description_agg' : name})
dict_.update({'ndc_description' : match[0]})
dict_.update({'score' : match[1]})
dict_list.append(dict_)
merge_table = pd.DataFrame(dict_list)
# Display results
merge_table
Here's the full error:
NotImplementedError Traceback (most recent call last)
<ipython-input-219-e8d4dcb63d89> in <module>
3 dict_list = []
4 # iterating over our players without salaries found above
----> 5 for name in prices_filtered_ddf['ndc_description'][:100]:
6 # Use our method to find best match, we can set a threshold here
7 match = client(match_name(name, all_data_ddf['ndc_description_agg'], 80))
C:\Anaconda\lib\site-packages\dask\dataframe\core.py in __getitem__(self, key)
2671 return Series(graph, name, self._meta, self.divisions)
2672 raise NotImplementedError(
-> 2673 "Series getitem in only supported for other series objects "
2674 "with matching partition structure"
2675 )
NotImplementedError: Series getitem in only supported for other series objects with matching partition structure
''''
I expect that the merge_table will return, in a relatively short time, a dataframe with data for each of the update columns. At the moment, it's extremely slow.