6

I am processing a dataframe with a click-stream and I'm extracting features for each user in the click-stream to be used in a Machine Learning project.

The dataframe is something like this:

data = pd.DataFrame({'id':['A01','B01','A01','C01','A01','B01','A01'],
                     'event':['search','search','buy','home','cancel','home','search'],
                     'date':['2018-01-01','2018-01-01','2018-01-02','2018-01-03','2018-01-04','2018-01-04','2018-01-06'],
                     'product':['tablet','dvd','tablet','tablet','tablet','book','book'],
                     'price': [103,2,203,103,203,21,21]})
data['date'] = pd.to_datetime(data['date'])

Since I have to create features for each user I'm using a groupby/apply with a custom function like:

featurized = data.groupby('id').apply(featurize)

Create user features will take a chunk of the dataframe and create many (hundreds) of features. The whole process is just too slow so I'm looking for a recommendation to do this more effciently.

An example of the function used to create features:

def featurize(group):
    features = dict()

    # Userid
    features['id'] = group['id'].max()
    # Feature 1: Number of search events
    features['number_of_search_events'] = (group['event']=='search').sum()
    # Feature 2: Number of tablets
    features['number_of_tablets'] = (group['product']=='tablet').sum()
    # Feature 3: Total time
    features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1,'D')
    # Feature 4: Total number of events
    features['events'] = len(group)
    # Histogram of products examined
    product_counts = group['product'].value_counts()
    # Feature 5 max events for a product
    features['max_product_events'] = product_counts.max()
    # Feature 6 min events for a product
    features['min_product_events'] = product_counts.min()
    # Feature 7 avg events for a product
    features['mean_product_events'] = product_counts.mean()
    # Feature 8 std events for a product
    features['std_product_events'] = product_counts.std()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = group.loc[group['product']=='tablet','price'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = group.loc[group['product']=='tablet','price'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = group.loc[group['product']=='tablet','price'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = group.loc[group['product']=='tablet','price'].mean()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = group.loc[group['product']=='tablet','price'].std()
    return pd.Series(features)

One potential problem is that each feature potentially scans the whole chunk so if I have 100 features I scan the chunk 100 times instead of just one.

For example a feature can be the number of "tablet" events the user has, other can be the number of "home" events, other can be the average time difference between "search" events, then average time difference between "search" events for "tablets", etc etc. Each feature can be coded as a function that takes a chunk (df) and creates the feature but when we have 100s of features each is scanning the whole chunk when a single linear scan would suffice. The problem is the code would get ugly if I do a manual for loop over each record in the chunk and code all the features in the loop.

Questions:

  1. If I have to process a dataframe hundreds of times, is there a way to abstract this in a single scan that will create all the needed features?

  2. Is there a speed improvement over the groupby/apply approach I'm currently using?

user3635284
  • 513
  • 2
  • 8
  • 16
  • 2
    I suspect you will have a difficult time getting specific answers despite the bounty. I would suggest that you provide the code for implementing 2-3 features, so others have something more concrete to work on. –  Jul 07 '18 at 18:50
  • Is the data already in a dataframe, or are you creating the dataframe from raw data? – Batman Jul 07 '18 at 19:01
  • I added a more detailed example with code that can be reproduced. – user3635284 Jul 08 '18 at 21:52

2 Answers2

3

Disclaimer: the following answer does not properly answer the above question. Just leaving it here for the sake of work invested. Maybe there will be some use for it at some point.

  1. re-use dataframe selections (e.g. group.loc[group['product']=='tablet','price'])
  2. parallelism (e.g. Parallelize apply after pandas groupby; see code below)
  3. use a cache if you run the calculations multiple times (e.g. HDFStore)
  4. avoid string operations; use native types which can be efficiently represented in numpy
  5. if you really need strings, use categorical columns (given they represent categorical data..)
  6. if the frames are really big, consider using chunks (e.g. "Large data" work flows using pandas)
  7. use cython for further (potentially drastical) enhancements

As for (1), given your code from above, I could produce speedups of up to 43% (i7-7700HQ CPU, 16GB RAM).

Timings

using joblib: 68.86841534099949s
using multiprocessing: 71.53540843299925s
single-threaded: 119.05010353899888s

Code

import pandas as pd
import numpy as np
import time
import timeit
import os
import joblib
import multiprocessing


import pandas as pd
import numpy as np
import timeit
import joblib
import multiprocessing


def make_data():
    # just some test data ...
    n_users = 100
    events = ['search', 'buy', 'home', 'cancel']
    products = ['tablet', 'dvd', 'book']
    max_price = 1000

    n_duplicates = 1000
    n_rows = 40000

    df = pd.DataFrame({
        'id': list(map(str, np.random.randint(0, n_users, n_rows))),
        'event': list(map(events.__getitem__, np.random.randint(0, len(events), n_rows))),
        'date': list(map(pd.to_datetime, np.random.randint(0, 100000, n_rows))),
        'product': list(map(products.__getitem__, np.random.randint(0, len(products), n_rows))),
        'price': np.random.random(n_rows) * max_price
    })
    df = pd.concat([df for _ in range(n_duplicates)])
    df.to_pickle('big_df.pkl')
    return df


def data():
    return pd.read_pickle('big_df.pkl')


def featurize(group):
    features = dict()

    # Feature 1: Number of search events
    features['number_of_search_events'] = (group['event'] == 'search').sum()
    # Feature 2: Number of tablets
    features['number_of_tablets'] = (group['product'] == 'tablet').sum()
    # Feature 3: Total time
    features['total_time'] = (group['date'].max() - group['date'].min()) / np.timedelta64(1, 'D')
    # Feature 4: Total number of events
    features['events'] = len(group)
    # Histogram of products examined
    product_counts = group['product'].value_counts()
    # Feature 5 max events for a product
    features['max_product_events'] = product_counts.max()
    # Feature 6 min events for a product
    features['min_product_events'] = product_counts.min()
    # Feature 7 avg events for a product
    features['mean_product_events'] = product_counts.mean()
    # Feature 8 std events for a product
    features['std_product_events'] = product_counts.std()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = group.loc[group['product'] == 'tablet', 'price'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = group.loc[group['product'] == 'tablet', 'price'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = group.loc[group['product'] == 'tablet', 'price'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = group.loc[group['product'] == 'tablet', 'price'].mean()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = group.loc[group['product'] == 'tablet', 'price'].std()
    return pd.DataFrame.from_records(features, index=[group['id'].max()])


# https://stackoverflow.com/questions/26187759/parallelize-apply-after-pandas-groupby
def apply_parallel_job(dfGrouped, func):
    retLst = joblib.Parallel(n_jobs=multiprocessing.cpu_count())(
        joblib.delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)


def apply_parallel_pool(dfGrouped, func):
    with multiprocessing.Pool(multiprocessing.cpu_count()) as p:
        ret_list = list(p.map(func, [group for name, group in dfGrouped]))
    return pd.concat(ret_list)


featurized_job = lambda df: apply_parallel_job(df.groupby('id'), featurize)
featurized_pol = lambda df: apply_parallel_pool(df.groupby('id'), featurize)
featurized_sng = lambda df: df.groupby('id').apply(featurize)

make_data()
print(timeit.timeit("featurized_job(data())", "from __main__ import featurized_job, data", number=3))
print(timeit.timeit("featurized_sng(data())", "from __main__ import featurized_sng, data", number=3))
print(timeit.timeit("featurized_pol(data())", "from __main__ import featurized_pol, data", number=3))

As for (7), consider the following refactorization:

Timings

original: 112.0091859719978s
re-used prices: 83.85681765000118s

Code

# [...]
prices_ = group.loc[group['product'] == 'tablet', 'price']
features['tablet_price_sum'] = prices_.sum()
# Feature 10 max price for tablet products
features['tablet_price_max'] = prices_.max()
# Feature 11 min price for tablet products
features['tablet_price_min'] = prices_.min()
# Feature 12 mean price for tablet products
features['tablet_price_mean'] = prices_.mean()
# Feature 13 std price for tablet products
features['tablet_price_std'] = prices_.std()
# [...]
Michael Hoff
  • 6,119
  • 1
  • 14
  • 38
  • Thanks for your answer, paralellization is always an option but I would like to optimize the code for a single processor first. There are a lot of repeated operations and things that I think should run faster. Then it can be parallelized if the resources are there. – user3635284 Jul 09 '18 at 17:23
  • @user3635284 I just updated my answer to elaborate a little bit more on your options. – Michael Hoff Jul 09 '18 at 17:31
  • Appreciated but I'm not looking for generic solutions to speed-up the code. I'm looking for an efficient way to solve this particular problem. – user3635284 Jul 09 '18 at 18:37
  • 1
    I kind of misread the question, I guess. To add my 2 cents: I think there is no straight-forward way to do it using pure pandas. You could generate a domain-specific language which produces fine-tuned functions to satisfy your feature requirements. Also, you could write generic functions which derive 'all possible' features (plainly some limitations are required at this point). But such an approach could facilitate one all-mighty C-function which produces most of the features one could ask for in one loop only. – Michael Hoff Jul 09 '18 at 19:20
1

DataFrame.agg() is your friend here. You're right in that the initial method implemented iterates over the entire dataset for EACH call. So what we can do is define all of the heavy lifting that we want to do at the beginning and let pandas handle all of the internal optimizations. Typically with these libraries, there's VERY rarely a time where you can code something that would beat just using the internal libraries.

What's nice about this method is that you only have to do this heavy calculation ONCE and then you can do all of the fine tuned feature creation on the filtered dataset since it's so much faster.

This reduces runtime by 65% percent, which is quite large. And also, next time you want to get a new statistic, you can just access the result of featurize2 and not have to run the computation again.

df = make_data()
# include this to be able to calculate standard deviations correctly
df['price_sq'] = df['price'] ** 2.

def featurize2(df):
    grouped = df.groupby(['id', 'product', 'event'])
    initial = grouped.agg({'price': ['count', 'max', 'min', 'mean', 'std', 'sum', 'size'], 'date': [
        'max', 'min'], 'price_sq': ['sum']}).reset_index()
    return initial

def featurize3(initial):

    # Features 5-8
    features = initial.groupby('product').sum()['price']['count'].agg(['max', 'min', 'mean', 'std']).rename({
        'max': 'max_product_events',
        'min': 'min_product_events',
        'mean': 'mean_product_events',
        'std': 'std_product_events'
    })

    searches = initial[initial['event'] == 'search']['price']

    # Feature 1: Number of search events
    features['number_of_search_events'] = searches['count'].sum()

    tablets = initial[initial['product'] == 'tablet']['price']
    tablets_sq = initial[initial['product'] == 'tablet']['price_sq']

    # Feature 2: Number of tablets
    features['number_of_tablets'] = tablets['count'].sum()
    # Feature 9 total price for tablet products
    features['tablet_price_sum'] = tablets['sum'].sum()
    # Feature 10 max price for tablet products
    features['tablet_price_max'] = tablets['max'].max()
    # Feature 11 min price for tablet products
    features['tablet_price_min'] = tablets['min'].min()
    # Feature 12 mean price for tablet products
    features['tablet_price_mean'] = (
        tablets['mean'] * tablets['count']).sum() / tablets['count'].sum()
    # Feature 13 std price for tablet products
    features['tablet_price_std'] = np.sqrt(tablets_sq['sum'].sum(
    ) / tablets['count'].sum() - features['tablet_price_mean'] ** 2.)

    # Feature 3: Total time
    features['total_time'] = (initial['date']['max'].max(
    ) - initial['date']['min'].min()) / np.timedelta64(1, 'D')

    # Feature 4: Total number of events
    features['events'] = initial['price']['count'].sum()

    return features

def new_featurize(df):
    initial = featurize2(df)
    final = featurize3(initial)
    return final

original = featurize(df)
final = new_featurize(df)

for x in final.index:
    print("outputs for index {} are equal: {}".format(
        x, np.isclose(final[x], original[x])))

print("featurize(df): {}".format(timeit.timeit("featurize(df)",
                    "from __main__ import featurize, df", number=3)))
print("featurize2(df): {}".format(timeit.timeit("featurize2(df)",
                    "from __main__ import featurize2, df", number=3)))
print("new_featurize(df): {}".format(timeit.timeit("new_featurize(df)",
                    "from __main__ import new_featurize, df", number=3)))

for x in final.index:
    print("outputs for index {} are equal: {}".format(
        x, np.isclose(final[x], original[x])))

Results

featurize(df): 76.0546050072
featurize2(df): 26.5458261967
new_featurize(df): 26.4640090466
outputs for index max_product_events are equal: [ True]
outputs for index min_product_events are equal: [ True]
outputs for index mean_product_events are equal: [ True]
outputs for index std_product_events are equal: [ True]
outputs for index number_of_search_events are equal: [ True]
outputs for index number_of_tablets are equal: [ True]
outputs for index tablet_price_sum are equal: [ True]
outputs for index tablet_price_max are equal: [ True]
outputs for index tablet_price_min are equal: [ True]
outputs for index tablet_price_mean are equal: [ True]
outputs for index tablet_price_std are equal: [ True]
outputs for index total_time are equal: [ True]
outputs for index events are equal: [ True]
Alexander Heath
  • 116
  • 1
  • 4