I'm trying to make a function which would unpivot PySpark dataframe using lists as arguments.
E.g. here is the code with two lists:
1 - ignored_columns_list
for ignored (not used) columns
2 - non_pivot_column_list
- columns that I don't want to unpivot.
All the rest columns are being unpivoted.
The function:
import pyspark.sql.functions as F
ignored_columns_list = ['column_name1'] # columns that I don't need
non_pivot_column_list = ['column_name2'] # columns I don't want to unpivot
def unpivot_columns_final(kpi_rf_df,ignored_columns_list,non_pivot_column_list):
ignored_columns_df = kpi_rf_df.drop(*ignored_columns_list) # columns that I don't need
non_pivot_column_df = kpi_rf_df.select(*non_pivot_column_list) # columns I don't want to unpivot
unpivot_columns_df = kpi_rf_df.drop(*ignored_columns_list,*non_pivot_column_list) # Columns that I want to unpivot
unpivot_columns_df_count = len(unpivot_columns_df.columns) # count columns to use inside Expr function
unpivot_df = kpi_rf_df.select(*ignored_columns_df.columns,F.expr(f"stack({unpivot_columns_df_count}, {', '.join(unpivot_columns_df.columns)}) as (value)"))
unpivot_df = unpivot_df.select(*non_pivot_column_df.columns, F.expr(f"stack({unpivot_columns_df_count}, {str(unpivot_columns_df.columns)[1:-1]}) as (kpi)"),'value')
return unpivot_df
The problem is that sometimes when I change columns in the list I get this error:
AnalysisException: cannot resolve 'stack(6, column_name1, column_name2, column_name3, column_name4, column_name5, column_name6)' due to data type mismatch: Argument 1 (double) != Argument 6 (date); line 1 pos 0;
I tried to sort the list of columns in expr
, but it doesn't help.