-1

I am having to begin working with some larger than memory data sets, meaning I need to get familiar with Dask quickly and in a hurry. Its not bad so far but I just ran into a problem that I think I pretty much solved but its not pretty and I wanted to see if there was a better way to go about it.

The Problem: I have time series data stored in a DataFrame. Each column(vector) needs to have a function applied to it. The function returns 3 additional vectors that I would like to append to the original DataFrame.

Code: The first part of the below code is my solution in normal Pandas. The second half is what I did to make it work in Dask.

import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd

#### Helper functions

def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np

    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
            filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])


    return [labels, avg_filter, std_filter]


def make_example_data():
    # Make example data
    y = np.array(
        [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
         1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
         2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
    # simulate data stored in individual files
    df = pd.DataFrame(
        {
            "Time": np.arange(len(y)),
            "y1": y,
            "y2": y * 2,
            "y3": y ** 2,
            "yn": y ** (y)
        }
    )

    bigdf = pd.DataFrame()
    for i in range(10):
        _df = df
        # create my partitioning column
        _df["session"] = "S0" + str(i)
        bigdf = pd.concat([bigdf, _df], axis=0)
    # return a normal dataframe that looks similar to a dask dataframe
    return bigdf

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0


############# Normal Pandas Solution ########################

bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
    res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
    res1 = pd.concat([pd.DataFrame(a).T for a in res1])
    res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
    results_df = pd.concat([results_df, res1], axis=1)

pd_results = pd.concat([bigdf, results_df], axis=1)

############### Dask Solution ############################
bigdf = make_example_data()
ddf = dd.from_pandas(bigdf, npartitions=10)


columns = list(ddf.columns)
# remove columns that don't have the function applied to them
columns.remove("Time")
columns.remove("session")

# get all the different sessions
sessions = ddf.groupby("session").count().compute().index.tolist()

# column names that get returned by my function
returns = ["_Signal", "_meanFilter", "_stdFilter"]

# list to hold example series for meta data
rcols = []
for col in columns:
    for r in returns:
        s = pd.Series([])
        s.name = col + r
        rcols.append(s)

results = pd.DataFrame(rcols).T
results = dd.from_pandas(results, npartitions=len(sessions))

for session in sessions:
    sess_df = ddf[ddf["session"] == session].compute()
    # making a dask df to store the results in
    sess_results = dd.from_pandas(sess_df, npartitions=1)

    for col in columns:
        # returns a list of 3 lists
        res = peak_detection_smoothed_zscore_v2(sess_df[col], lag, threshold, influence)
        # turn 3 lists into a dataframe of 3 columns
        res = pd.concat([pd.DataFrame(a).T for a in res]).T
        _cols = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
        res.columns = _cols
        # do this iteratively cause I can't figure out how to do it in a single line
        for cc in _cols:
            sess_results[cc] = res[cc]
        # NOTE: If memory is a problem could probably throw this to disk here

    # append session results to main results
    results = results.append(sess_results)

dd_results = results.compute()

print("Are my Dask results the same as my Pandas results?", dd_results.shape == pd_results.shape)

Questions:

  • I am looking for better possible solution. As you can see the Dask code is much longer and kind of complicated. Is there any way to make it less messy? Maybe do away with the forloops?

  • One other problem I foresee is what if I have a Dask partition that is just small enough to fit in memory. What happens when I create 3 more vectors of equal length? Does my system die?

  • If there isn't really a way to clean things up. Am I at least doing things as efficiently as possible?

Thanks

Matt Camp
  • 1,448
  • 3
  • 17
  • 38

1 Answers1

0

After working this problem almost a week I think I have a solution. It is not as succinct as I would like, but it does prevent having to load too much into memory at one time. I am not 100% clear on if I scaled this up and off of just my laptop if it would distribute the tasks to other worker nodes or not.

What I ended up doing was moving my data from feather files into bcolz ctables. This allowed me to mutate the dataframes/ctables without the hassle that Dask introduced. And I am pretty certain I don't have to worry about my computer running out of memory.

import bcolz
import numpy as np
import pandas as pd
import os
import dask
import datetime
from dask import delayed
from dask import visualize
import pandas as pd
import dask.dataframe as dd
from copy import copy


def peak_detection_smoothed_zscore_v2(x, lag, threshold, influence, lst=True):
    '''
    iterative smoothed z-score algorithm
    Implementation of algorithm from https://stackoverflow.com/a/22640362/6029703
    '''
    import numpy as np

    labels = np.zeros(len(x))
    filtered_y = np.array(x)
    avg_filter = np.zeros(len(x))
    std_filter = np.zeros(len(x))
    var_filter = np.zeros(len(x))

    avg_filter[lag - 1] = np.mean(x[0:lag])
    std_filter[lag - 1] = np.std(x[0:lag])
    var_filter[lag - 1] = np.var(x[0:lag])
    for i in range(lag, len(x)):
        if abs(x[i] - avg_filter[i - 1]) > threshold * std_filter[i - 1]:
            if x[i] > avg_filter[i - 1]:
                labels[i] = 1
            else:
                labels[i] = -1
            filtered_y[i] = influence * x[i] + (1 - influence) * filtered_y[i - 1]
        else:
            labels[i] = 0
            filtered_y[i] = x[i]
        # update avg, var, std
        avg_filter[i] = avg_filter[i - 1] + 1. / lag * (filtered_y[i] - filtered_y[i - lag])
        var_filter[i] = var_filter[i - 1] + 1. / lag * ((filtered_y[i] - avg_filter[i - 1]) ** 2 - (
                filtered_y[i - lag] - avg_filter[i - 1]) ** 2 - (filtered_y[i] - filtered_y[i - lag]) ** 2 / lag)
        std_filter[i] = np.sqrt(var_filter[i])

    return [labels, avg_filter, std_filter]


def make_example_data():
    # Make example data
    y = np.array(
        [1, 1, 1.1, 1, 0.9, 1, 1, 1.1, 1, 0.9, 1, 1.1, 1, 1, 0.9, 1, 1, 1.1, 1, 1, 1, 1, 1.1, 0.9, 1, 1.1, 1, 1, 0.9,
         1, 1.1, 1, 1, 1.1, 1, 0.8, 0.9, 1, 1.2, 0.9, 1, 1, 1.1, 1.2, 1, 1.5, 1, 3, 2, 5, 3, 2, 1, 1, 1, 0.9, 1, 1, 3,
         2.6, 4, 3, 3.2, 2, 1, 1, 0.8, 4, 4, 2, 2.5, 1, 1, 1])
    # simulate data stored in individual files
    df = pd.DataFrame(
        {
            "Time": np.arange(len(y)),
            "y1": y,
            "y2": y * 2,
            "y3": y ** 2,
            "yn": y ** (y)
        }
    )

    bigdf = pd.DataFrame()
    for i in range(10):
        _df = df
        # create my partitioning column
        _df["session"] = "S0" + str(i)
        bigdf = pd.concat([bigdf, _df], axis=0)
    # return a normal dataframe that looks similar to a dask dataframe
    return bigdf

def ctable_append(cts):
    """
    A function to append multiple ctables and clean up the disk entries along the 0 axis
    similar to pd.concat([df1, df2], axis=0)


    :param cts: a string containing the root directory path or a list of ctables
    :return: ctable
    """
    import shutil

    ctables = []
    first = True

    # check if we are getting a list or a root dir
    if type(cts) == str:
        cts = bcolz.walk(cts)

    for ct in cts:
        if first is True:
            ct1 = ct
        else:
            ct1.append(ct)
            shutil.rmtree(ct.rootdir)
        first = False

    return ct1

# Settings: lag = 30, threshold = 5, influence = 0
lag = 30
threshold = 5
influence = 0

bigdf = make_example_data()
results_df = pd.DataFrame()
columns = list(bigdf.columns)
columns.remove("Time")
columns.remove("session")
for col in columns:
    res1 = bigdf.groupby("session")[col].apply(peak_detection_smoothed_zscore_v2, lag, threshold, influence)
    res1 = pd.concat([pd.DataFrame(a).T for a in res1])
    res1.columns = [col + "_Signal", col + "_meanFilter", col + "_stdFilter"]
    results_df = pd.concat([results_df, res1], axis=1)

pd_results = pd.concat([bigdf, results_df], axis=1)

bigdf = make_example_data()
sessions = list(set(bigdf['session']))
root_dir = os.path.join(os.getcwd(), 'example_data')

# breaking this example dataset out into something a little more like my real dataset
for session in sessions:
    sdf = bigdf[bigdf['session'] == session]
    sess_dir = os.path.join(root_dir, session)
    bcolz.ctable.fromdataframe(sdf, rootdir=sess_dir)

dnapply_cols = [
    'session',
    'Time'
]  # columns that are not signals to find peaks in

lazy_apply = []
# apply my function to all the data.. making the extra columns
# don't think that Dask is really needed here as I'm not sure if it actually distributes the tasks
# when I ran this on a lot more data I only had one maybe two cores doing anything.
# this could have been because of the cost of memory but my ram didn't really go beyond being
# half used.
for ct in bcolz.walk(root_dir):
    for column in ct.cols.names:
        if column not in dnapply_cols:
            #             signal, mean_filter, std_filter = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
            res = delayed(peak_detection_smoothed_zscore_v2)(ct[column], lag, threshold, influence)
            lazy_apply.append(delayed(ct.addcol)(res[0], name=column + "_Signal"))
            lazy_apply.append(delayed(ct.addcol)(res[1], name=column + "_meanFilter"))
            lazy_apply.append(delayed(ct.addcol)(res[2], name=column + "_stdFilter"))

dask.compute(*lazy_apply)

# combine all ctables into a single ctable

ct1 = ctable_append(root_dir)
dd_results = dd.from_bcolz(ct1, chunksize=74)  # chose a chunk size of 74 cause thats about how long each session df was
print(dd_results.head(), dd_results.compute().shape, pd_results.shape)
print("Are my Dask results the same as my Pandas results?", dd_results.compute().shape == pd_results.shape)
Matt Camp
  • 1,448
  • 3
  • 17
  • 38
  • 2
    I don't know Dash, but the peak detection algorithm only needs the last `lag` elements to function properly. My original algorithm loops over all datapoints everytime it is called, which is extremely inefficient (I also warn against doing this). So ideally you want to loop over your data with a moving window, only loading `lag` elements in memory, updating the algorithm, removing the data from memory and continue to the next window (+1 new observation). That should be the most memory efficient way of doing it. – Jean-Paul Oct 04 '18 at 08:22