21

I have a dask dataframe grouped by the index (first_name).

import pandas as pd
import numpy as np

from multiprocessing import cpu_count

from dask import dataframe as dd
from dask.multiprocessing import get 
from dask.distributed import Client


NCORES = cpu_count()
client = Client()

entities = pd.DataFrame({'first_name':['Jake','John','Danae','Beatriz', 'Jacke', 'Jon'],'last_name': ['Del Toro', 'Foster', 'Smith', 'Patterson', 'Toro', 'Froster'], 'ID':['X','U','X','Y', '12','13']})

df = dd.from_pandas(entities, npartitions=NCORES)
df = client.persist(df.set_index('first_name'))

(Obviously entities in the real life is several thousand rows)

I want to apply a user defined function to each grouped dataframe. I want to compare each row with all the other rows in the group (something similar to Pandas compare each row with all rows in data frame and save results in list for each row).

The following is the function that I try to apply:

def contraster(x, DF):
    matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis = 1) 
    return [i for i, x in enumerate(matches) if x]

For the test entities data frame, you could apply the function as usual:

entities.apply(lambda row: contraster(row['last_name'], entities), axis =1)

And the expected result is:

Out[35]: 
0    [0, 4]
1    [1, 5]
2       [2]
3       [3]
4    [0, 4]
5    [1, 5]
dtype: object

When entities is huge, the solution is use dask. Note that DF in the contraster function must be the groupped dataframe.

I am trying to use the following:

df.groupby('first_name').apply(func=contraster, args=????)

But How should I specify the grouped dataframe (i.e. DF in contraster?)

nanounanue
  • 7,942
  • 7
  • 41
  • 73
  • Hi again. Could you please clarify... when you are grouping by the first name. What is the purpose of that? If for instance you have 1000 people named Jane, with different and similar last names, what output would you expect? Do you want to compare everybody with the same first name and similar last names? – mortysporty Mar 21 '18 at 19:33
  • The problem that I try to solve is "deduplication" a special type of "record linkage". Compare all rows against all rows grown quadratically. So it is not feasible. The standard approach is do "blocking" i.e. divide the records in blocks and only do the comparison inside the block. Block for one exact column is a simplification for the sake of the question. – nanounanue Mar 21 '18 at 19:49
  • Can you make entities a global variable? Then you don't need to pass anything when you use apply. – Mikhail Venkov Mar 21 '18 at 20:58
  • Check out this post for use off the `agg` functions rather than `apply` https://stackoverflow.com/questions/44577019/python-pandas-passing-arguments-to-a-function-in-agg – mortysporty Mar 21 '18 at 21:11

2 Answers2

10

The function you provide to groupby-apply should take a Pandas dataframe or series as input and ideally return one (or a scalar value) as output. Extra parameters are fine, but they should be secondary, not the first argument. This is the same in both Pandas and Dask dataframe.

def func(df, x=None):
    # do whatever you want here
    # the input to this function will have all the same first name
    return pd.DataFrame({'x': [x] * len(df),
                         'count': len(df),
                         'first_name': df.first_name})

You can then call df.groupby as normal

import pandas as pd
import dask.dataframe as dd

df = pd.DataFrame({'first_name':['Alice', 'Alice', 'Bob'],
                   'last_name': ['Adams', 'Jones', 'Smith']})

ddf = dd.from_pandas(df, npartitions=2)

ddf.groupby('first_name').apply(func, x=3).compute()

This will produce the same output in either pandas or dask.dataframe

   count first_name  x
0      2      Alice  3
1      2      Alice  3
2      1        Bob  3
MRocklin
  • 55,641
  • 23
  • 163
  • 235
  • If I try to run your example I get the following error: ```--------------------------------------------------------------------------- TypeError Traceback (most recent call last) in () 7 ddf = dd.from_pandas(df, npartitions=2) 8 ----> 9 ddf.groupby('first_name').apply(func, x=3).compute() TypeError: apply() got an unexpected keyword argument 'x' ``` – nanounanue Mar 23 '18 at 01:37
  • Try upgrading to a newer version of dask – MRocklin Mar 23 '18 at 14:42
  • I am running the last version of dask (`0.17.2`) – nanounanue Mar 25 '18 at 17:00
  • The above works fine for me on 0.17.2. You could raise an issue with a minimal environment that causes the failure. – MRocklin Mar 25 '18 at 21:33
  • I've verified that this works with a clean install. I created a new environment with `conda install -n myenv dask=0.17.2 ipython` and got the desired results. – MRocklin Mar 27 '18 at 13:15
  • You are right. It runs. To give you the bounty. Could you adapt your example to the method `contraster`? – nanounanue Mar 28 '18 at 23:50
  • I assigned the bounty. I hope that you could modify your answer – nanounanue Mar 29 '18 at 05:26
  • Thank you for assigning the bounty. I prefer to keep the answer as-is so that other readers can execute the code. The contraster function is not self-contained. See https://stackoverflow.com/help/mcve . Feel free to unassign the bounty if you wish. – MRocklin Mar 29 '18 at 11:29
6

With a little bit of guesswork, I think that the following is what you are after.

def mapper(d):

    def contraster(x, DF=d):
        matches = DF.apply(lambda row: fuzz.partial_ratio(row['last_name'], x) >= 50, axis = 1)
        return [d.ID.iloc[i] for i, x in enumerate(matches) if x]
    d['out'] = d.apply(lambda row: 
        contraster(row['last_name']), axis =1)
    return d

df.groupby('first_name').apply(mapper).compute()

Applied to your data, you get:

   ID first_name  last_name   out
2   X      Danae      Smith   [X]
4  12      Jacke       Toro  [12]
0   X       Jake   Del Toro   [X]
1   U       John     Foster   [U]
5  13        Jon    Froster  [13]
3   Y    Beatriz  Patterson   [Y]

i.e., because you group by first_name, each group only contains one item, which matches only with itself.

If, however, you has some first_name values that were in multiple rows, you would get matches:

entities = pd.DataFrame(
    {'first_name':['Jake','Jake', 'Jake', 'John'],
     'last_name': ['Del Toro', 'Toro', 'Smith'
                   'Froster'],
     'ID':['Z','U','X','Y']})

Output:

  ID first_name last_name     out
0  Z       Jake  Del Toro  [Z, U]
1  U       Jake      Toro  [Z, U]
2  X       Jake     Smith     [X]
3  Y       John   Froster     [Y]

If you do not require exact matches on the first_name, then maybe you need to sort/set index by the first_name and use map_partitions in a similar way. In that case, you will need to reform your question.

Jai Chauhan
  • 4,035
  • 3
  • 36
  • 62
mdurant
  • 27,272
  • 5
  • 45
  • 74