1

Suppose I have the following code, to generate a dummy dask dataframe:

import pandas as pd
import dask.dataframe as dd
pandas_dataframe = pd.DataFrame({'A' : [0,500,1000], 'B': [-100, 200, 300]  , 'C' : [0,0,1.0] } )    
test_data_frame = dd.from_pandas( pandas_dataframe, npartitions= 1  )

Ideally I would like to know what is the recommended way to add another column to the data frame, computing the column content through a rolling window, in a lazy fashion.

I came up with the following approach:

import numpy as np
import dask.delayed as delay

@delay
def coupled_operation_example(dask_dataframe, 
                              list_of_input_lbls, 
                              fcn, 
                              window_size, 
                              init_value, 
                              output_lbl):

    def preallocate_channel_data(vector_length, first_components):
        vector_out = np.zeros(len(dask_dataframe))
        vector_out[0:len(first_components)] = first_components
        return vector_out

    def create_output_signal(relevant_data, fcn, window_size , initiated_vec):

       ## to be written; fcn would be  a fcn accepting the sliding window


    initiatied_vec = preallocate_channel_data(len(dask_dataframe, init_value))
    relevant_data = dask_dataframe[list_of_input_lbls]
    my_output_signal = create_output_signal(relevant_data, fcn, window_size, initiated_vec)

I was writing this, convinced that dask dataframe would allow me some slicing: they do not. So, my first option would be to extract the columns involved in the computations as numpy arrays, but so they would be eagerly evaluated. I think the penalty in performance would be significant. At the moment I create dask dataframes from h5 data, using h5py: so everything is lazy, until I write output files.

Up to now I was processing only data on a certain row; so I had been using:

 test_data_frame .apply(fcn, axis =1, meta = float)

I do not think there is an equivalent functional approach for rolling windows; am I right? I would like something like Seq.windowed in F# or Haskell. Any suggestion highly appreciated.

  • 1
    You can use rolling (see [here](https://github.com/dask/dask/issues/3769)) but you might want to set a proper index given that your data is in different partitions. – rpanai Apr 02 '19 at 11:42
  • @user32185: thanks. I will look into that; if I understand well I will have to enlarge the data set to keep track of the overall index in the non-partitioned dataframe? – pacta_sunt_servanda Apr 02 '19 at 11:48
  • I have looked into data_frame.rolling(rolling_window), but I do not get how to define the function to be applied. Also: shall I use apply? df.rolling(window_size).apply(f) ? It does not seem to be working. Thanks – pacta_sunt_servanda Apr 02 '19 at 12:38
  • 1
    On a further note: I have found this syntax to be working: df.rolling(2).mean().persist() , posted here: http://matthewrocklin.com/blog/work/2017/07/03/scaling but it only allows me to call simple statistical functions (like 'mean'); it does not seem to be meant to pass any user defined function. – pacta_sunt_servanda Apr 02 '19 at 12:40
  • Apparently from this https://stackoverflow.com/questions/31361721/python-dask-dataframe-support-for-trivially-parallelizable-row-apply it would seem that apply is quite slow. If anyone is interested I can post performance results. Also, in my specific case, I need a window size of only two (actual state and previous state). I am getting away with a closure (which can be generalized, but not interested, at the moment). If there is interest, again, I can post my code. – pacta_sunt_servanda Apr 03 '19 at 11:20
  • Please update your answer with timing. – rpanai Apr 03 '19 at 13:08
  • @user32185 before proceeding I posed another question; since it came to my mind that the order of the rows could potentially be scrambled, if processed through apply. For this I have created this question: https://stackoverflow.com/questions/55495906/does-dask-dataframe-apply-preserve-rows-order I will make some tests and update with timing, if the order is preserved; though I should be sure that the rows order is always preserved adopting the apply method. – pacta_sunt_servanda Apr 03 '19 at 13:50

1 Answers1

0

I have tried to solve it through a closure. I will post benchmarks on some data, as soon as I have finalized the code. For now I have the following toy example, which seems to work: since dask dataframe's apply methods seems to be preserving the row order.

import numpy as np
import pandas as pd
import dask.dataframe as dd
number_of_components = 30


df = pd.DataFrame(np.random.randint(0,number_of_components,size=(number_of_components, 2)), columns=list('AB'))
my_data_frame = dd.from_pandas(df, npartitions = 1 )


def sumPrevious( previousState ) :

     def getValue(row):
        nonlocal previousState 
        something = row['A'] - previousState 
        previousState = row['A']
        return something

     return getValue


given_func = sumPrevious(1 )
out = my_data_frame.apply(given_func, axis = 1 , meta = float)
df['computed'] = out.compute()

Now the bad news, I have tried to abstract it out, passing the state around and using a rolling window of any width, through this new function:

def generalised_coupled_computation(previous_state , coupled_computation, previous_state_update) :

    def inner_function(actual_state):
        nonlocal previous_state
        actual_value = coupled_computation(actual_state , previous_state  )
        previous_state = previous_state_update(actual_state, previous_state)
        return actual_value

    return inner_function

Suppose we initialize the function with:

init_state = df.loc[0] 
coupled_computation  = lambda act,prev : act['A'] - prev['A']
new_update = lambda act, prev : act
given_func3 = generalised_coupled_computation(init_state , coupled_computation, new_update )
out3 = my_data_frame.apply(given_func3, axis = 1 , meta = float)

Try to run it and be ready for surprises: the first element is wrong, possibly some pointer's problems, given the odd result. Any insight?

Anyhow, if one passes primitive types, it seems to function.


Update:

the solution is in using copy:

import copy as copy

def new_update(act, previous):
    return copy.copy(act)

Now the functions behaves as expected; of course it is necessary to adapt the function updates and the coupled computation function if one needs a more coupled logic