0

I am using Lifetimes python package in my spark cluster and trying to calculate some metrics -- Lifetimes package

I have been using spark 2.4.2. I have data frame like below sample (original data has 800K records) , containing 'invoce_date' column and some other columns (id, label, county etc)

# reproducible Spark 
df = sqlCtx.createDataFrame(
    [
        ('78aa', 1, 'A', '2020-04-14 19:00:00'),
        ('14aa', 3, 'B', '2020-04-17 16:00:00'),
        ('25aa', 5, 'A', '2020-04-14 15:30:00'),
        ('36aa', 7, 'B', '2020-04-14 21:30:00')
    ],
    ('id', 'X', 'label', 'invoce_date')
)

Here I am trying to leverage Lifetimes

import lifetimes

# set the last transaction date as the end point for this historical dataset


current_date = df.agg(max("invoce_date")).collect()[0][0]



# calculate the metrics
metrics = (
  lifetimes.utils.summary_data_from_transaction_data(
    df,
    customer_id_col='id',
    datetime_col='invoce_date',
    observation_period_end = current_date, 
    freq='D'
    )
  )

# display first few rows
metrics.head(10)

This return an error 'DataFrame' object has no attribute 'sort_values' , I have added df = df.sort("invoce_date")before metric calculation but still keep getting the same error and I couldn't figure it out

Here is my datatype for reference

df.dtypes
[('id', 'string'),
 ('y', 'string'),
 ('label', 'string'),
 ('invoce_date', 'timestamp')]
DanG
  • 689
  • 1
  • 16
  • 39

1 Answers1

2

Lifetimes uses Pandas dataframes, while the df variable in your example is a PySpark dataframe. Before using functions from the Lifetimes package you have to convert your data into a Pandas dataframe by calling df.toPandas() (more details here).

Please note that calling toPandas() will load all data into the driver's memory. Lifetimes does not support distributed computing with multiple executors.

werner
  • 13,518
  • 6
  • 30
  • 45