I have two date columns in dataframe in pyspark. I have to create few more columns. I will have to pick a date from first date column and based on this date, I will find a few days range around it as in date1 - d1 and date1 -d2. I will filter the dataframe from second date column in this date range. After filtering it, I will apply a sum function to a feature to create a column. I don't want to use loop and UDF doesn't accept the dataframe.
def func(dtime, SERIAL, feature, TIME_AUTOCHECK, SERIAL_AC, days, grain, f_n, method, null, df_FINAL = df_FINAL):
df_FINAL = df_FINAL.withColumn(TIME_AUTOCHECK, F.to_date(col(TIME_AUTOCHECK)))
d1 = int((f_n - 1)*grain+1)
d2 = int(f_n * grain)
date_to = dtime - datetime.timedelta(days = d1)
date_from = dtime - datetime.timedelta(days = d2)
new_df = df_FINAL.filter(col(TIME_AUTOCHECK) > date_from).filter(col(TIME_AUTOCHECK) < date_to).filter(col(SERIAL_AC) == SERIAL).select(feature)
if method == "concat":
if new_df.count() != 0:
return ",".join(new_df.rdd.flatMap(lambda x:x).collect())
else:
return None
if method == "mode":
from statistics import multimode
if new_df.count() != 0:
return multimode(new_df.rdd.flatMap(lambda x:x).collect())[0]
else:
return None
if method == "least_count":
if new_df.count() != 0:
f = new_df.toPandas()
return f.value_counts().index[-1]
else:
return None
if method == "sum":
if new_df.count() != 0:
return sum(list(map(int, new_df.rdd.flatMap(lambda x:x).collect())))
else:
return None
if method == "count":
if new_df.count() != 0:
return count([s for s in new_df.rdd.flatMap(lambda x:x).collect() if s !=None]) if null == "no" else count([s for s in new_df.rdd.flatMap(lambda x:x).collect() if s==None])
else:
return None
def func_udf(feature, TIM, SERIAL_AC, days, grain, f_n, method, null, df_FINAL = df_FINAL):
return udf(lambda SERIAL, dtime: func(dtime, SERIAL, feature, TIM, SERIAL_AC, days, grain, f_n, method, null, df_FINAL = df_FINAL))
from datetime import timedelta
from pyspark.sql.functions import date_add
cols = ["CODE", "CODE1", "CODE2", "CODE3", "CODE4"]
days = 7
grain = 1
method = "count"
null = "yes"
for c in cols:
for d in range(1, int(days/grain)+1):
f_name = c + "_"+method + "_" + str(d)
df_FINAL = df_FINAL.withColumn(f_name, func_udf(c, "TIME_SYSMSG", "SERIAL_NUMBER_SM", days, grain, d, method, null)(col("SERIAL_NUMBER"), col("TIMESTAMP")))
The error I am getting is: TypeError: cannot pickle '_thread.RLock' object