0

Aim: To speed up applying a function row wise across a large data frame (1.9 million ~ rows)

Attempt: Using dask map_partitions where partitions == number of cores. I've written a function which is applied to each row, creates a dict containing a variable number of new values (between 1 and 55). This function works fine standalone.

Problem: I need a way to combine the output of each function into a final dataframe. I tried using df.append, where I'd append each dict to a new dataframe and return this dataframe. If I understand the Dask Docs, Dask should then combine them to one big DF. Unfortunately this line is tripping an error (ValueError: could not broadcast input array from shape (56) into shape (1)). Which leads me to believe it's something to do with the combine feature in Dask?

#Function to applied row wise down the dataframe. Takes a column (post) and new empty df. 
def func(post,New_DF):
    post = str(post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    New_DF = New_DF.append(scores, ignore_index=True)
    return(New_DF)

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(
      lambda df : df.apply(
         lambda x : func(x.post,New_DF),axis=1)).\
   compute(get=get)
F.D
  • 767
  • 2
  • 10
  • 23

1 Answers1

4

I am not quite sure I completely understand your code in lieu of an MCVE but I think there is a bit of a misunderstanding here.

In this piece of code you take a row and a DataFrame and append one row to that DataFrame.

#Function to applied row wise down the dataframe. Takes a column (post) and new empty df. 
def func(post,New_DF):
    post = str(post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    New_DF = New_DF.append(scores, ignore_index=True)
    return(New_DF)

Instead of appending to New_DF, I would recommend just returning a pd.Series which df.apply concatenates into a DataFrame. That is because if you are appending to the same New_DF object in all nCores partitions, you are bound to run into trouble.

 #Function to applied row wise down the dataframe. Takes a row and returns a row. 
def tobsecret_func(row):
    post = str(row.post)
    scores = OtherFUNC.countWords(post)
    scores['post'] = post
    length_adjusted_series = pd.Series(scores).reindex(range(55))
    return(length_adjusted_series)

Your error also suggests that as you wrote in your question, your function creates a variable number of values. If the pd.Series you return doesn't have the same shape and column names, then df.apply will fail to concatenate them into a pd.DataFrame. Therefore make sure you return a pd.Series of equal shape each time. This question shows you how to create pd.Series of equal length and index: Pandas: pad series on top or bottom

I don't know what kind of dict your OtherFUNC.countWords returns exactly, so you may want to adjust the line: length_adjusted_series = pd.Series(scores).reindex(range(55))

As is, the line would return a Series with an index 0, 1, 2, ..., 54 and up to 55 values (if the dict originally had less than 55 keys, the remaining cells will contain NaN values). This means after applied to a DataFrame, the columns of that DataFrame would be named 0, 1, 2, ..., 54.

Now you take your dataset and map your function to each partition and in each partition you apply it to the DataFrame using apply.

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(
      lambda df : df.apply(
         lambda x : func(x.post,New_DF),axis=1)).\
   compute(get=get)

map_partitions expects a function which takes as input a DataFrame and outputs a DataFrame. Your function is doing this by using a lambda function that basically calls your other function and applies it to a DataFrame, which in turn returns a DataFrame. This works but I highly recommend writing a named function which takes as input a DataFrame and outputs a DataFrame, it makes it easier for you to debug your code.

For example with a simple wrapper function like this:

df_wise(df):
    return df.apply(tobsecret_func)

Especially as your code gets more complex, abstaining from using lambda functions that call non-trivial code like your custom func and instead making a simple named function can help you debug because the traceback will not just lead you to a line with a bunch of lambda functions like in your code but will also directly point to the named function df_wise, so you will see exactly where the error is coming from.

#Dask 
dd.from_pandas(dataset,npartitions=nCores).\
 map_partitions(df_wise, 
                meta=df_wise(dd.head())
                ).\
   compute(get=get)

Notice that we just fed dd.head() to df_wise to create our meta-keyword which is similar to what Dask would do under the hood.

You are using dask.get, the synchronous scheduler which is why the whole New_DF.append(...) code could work, since you append to the DataFrame for each consecutive partition.

This does not give you any parallelism and thus will not work if you use one of the other schedulers, all of which parallelise your code.

The documentation also mentions the meta keyword argument, which you should supply to your map_partitions call, so dask knows what columns your DataFrame will have. If you don't do this, dask will first have to do a trial run of your function on one of the partitions and check what the shape of the output is before it can go ahead and do the other partitions. This can slow down your code by a ton if your partitions are large; giving the meta keyword bypasses this unnecessary computation for dask.

tobsecret
  • 2,442
  • 15
  • 26
  • Thanks for your detailed answers, you're answer highlighted my lack of knowledge so i've gone back to Dask basics to brush up. I'l hold off accepting an answer until I can fully understand what's happening with map_partitions. Sorry about this! – F.D Jul 30 '18 at 20:50
  • Hey, hope this is helpful and wasn't too rude. If you want any more clarifications, please let me know! Dask has some of its own rules, don't feel bad if you don't intuit it quickly - neither did I. – tobsecret Jul 30 '18 at 21:33
  • No it wasn't! It was a very suitable response! I've been looking into this a bit more, and as per this [question] (https://stackoverflow.com/questions/51602248/python-dask-map-partitions#51603469 ) a external function will only be called once per partition where I need it row wise. I could try using .map instead but then I run into an issue with returning multiple values/columns. In the end i'm not sure Dask is really suitable for this task unless you can think of something i've missed? – F.D Jul 31 '18 at 08:41
  • I hinted at this a bit in my answer - the way you are using map_partitions should work fine (besides the `New_DF.append` logic and the fact that your func is returning variable length `pd.Series`). I will add an edit how I would re-factor your code. – tobsecret Jul 31 '18 at 15:08
  • Cheers, so I "understand?" as far down as passing the dd.head() to the meta. I understand the logic but this throws an error that dask.dataframe has no attribute head. Which suggests the dd is not be correct created perhaps? Also, are you aware of a dask.get alternative that would allow for parallising the combining? Thanks for the help and increasing details reply! – F.D Jul 31 '18 at 16:09
  • Hmmm... what version of dask are you using? [dask.dataframe.head](http://dask.pydata.org/en/latest/dataframe-api.html#dask.dataframe.DataFrame.head) is definitely in the API documentation – tobsecret Jul 31 '18 at 16:21
  • In my version of dask, the following prints a 5,5 shaped DataFrame: `import numpy as np; import dask.dataframe as dd;` `print(dd.from_array(np.random.randint(1,9, (10, 5))).head())` – tobsecret Jul 31 '18 at 16:27
  • Yes that works for me as well. Ah this stage I've moved from using Dask. Thank you for your help but I don't think it's going for me this time! I've accepted your answer and hopefully it can support some people in the future :) – F.D Aug 01 '18 at 09:32
  • Oh ok thanks for accepting and hope you get to take a look back at dask at some point - it's a really nice tool! – tobsecret Aug 01 '18 at 14:28