0

I have two date columns in dataframe in pyspark. I have to create few more columns. I will have to pick a date from first date column and based on this date, I will find a few days range around it as in date1 - d1 and date1 -d2. I will filter the dataframe from second date column in this date range. After filtering it, I will apply a sum function to a feature to create a column. I don't want to use loop and UDF doesn't accept the dataframe.

def func(dtime, SERIAL, feature, TIME_AUTOCHECK, SERIAL_AC, days, grain, f_n, method, null, df_FINAL = df_FINAL):
  df_FINAL = df_FINAL.withColumn(TIME_AUTOCHECK, F.to_date(col(TIME_AUTOCHECK)))
  d1 = int((f_n - 1)*grain+1)
  d2 = int(f_n * grain)
  date_to = dtime - datetime.timedelta(days = d1)
  date_from = dtime - datetime.timedelta(days = d2)
  new_df = df_FINAL.filter(col(TIME_AUTOCHECK) > date_from).filter(col(TIME_AUTOCHECK) < date_to).filter(col(SERIAL_AC) == SERIAL).select(feature)
  if method == "concat":
    if new_df.count() != 0:
      return ",".join(new_df.rdd.flatMap(lambda x:x).collect())
    else:
      return None
     
  if method == "mode":
    from statistics import multimode
    if new_df.count() != 0:
      return multimode(new_df.rdd.flatMap(lambda x:x).collect())[0]
    else:
      return None

  if method == "least_count":
    if new_df.count() != 0:
      f = new_df.toPandas()
      return f.value_counts().index[-1]
    else:
      return None

  if method == "sum":
    if new_df.count() != 0:
      return sum(list(map(int, new_df.rdd.flatMap(lambda x:x).collect())))
    else:
      return None

  if method == "count":
    if new_df.count() != 0:
      return count([s for s in new_df.rdd.flatMap(lambda x:x).collect() if s !=None]) if null == "no" else count([s for s in new_df.rdd.flatMap(lambda x:x).collect() if s==None])
    else:
      return None


def func_udf(feature, TIM, SERIAL_AC, days, grain, f_n, method, null, df_FINAL = df_FINAL):
  return udf(lambda SERIAL, dtime: func(dtime, SERIAL, feature, TIM, SERIAL_AC, days, grain, f_n, method, null, df_FINAL = df_FINAL))

from datetime import timedelta
from pyspark.sql.functions import date_add
cols = ["CODE", "CODE1", "CODE2", "CODE3", "CODE4"]
days = 7
grain = 1
method = "count"
null = "yes"

for c in cols:
  for d in range(1, int(days/grain)+1):
    f_name = c + "_"+method + "_" + str(d)
    df_FINAL = df_FINAL.withColumn(f_name, func_udf(c, "TIME_SYSMSG", "SERIAL_NUMBER_SM", days, grain, d, method, null)(col("SERIAL_NUMBER"), col("TIMESTAMP")))

The error I am getting is: TypeError: cannot pickle '_thread.RLock' object

buhtz
  • 10,774
  • 18
  • 76
  • 149
  • 2
    Welcome to StackOverflow! What have you tried? Please include a minimal reproducible example of your problem https://stackoverflow.com/help/minimal-reproducible-example – danielsepulvedab Feb 03 '22 at 13:55
  • 1
    Can you provide any code? – wordinone Feb 04 '22 at 02:57
  • Please provide your code in the question and not as an answer. You should be able to _Edit_ your own question. I did it for you. So far your example is a bit to complex. It would be great if you could try to break it down a little bit, make lines not longer than 80 chars and provide the full error output including the line number. Your error does not sound like that it is related to pandas. Your code is not executable: imports are missing, sample data is missing. Please read https://stackoverflow.com/q/20109391/4865723 – buhtz Feb 08 '22 at 16:17
  • Its in pyspark. – Sumit Kumar Feb 08 '22 at 16:21
  • Have you read the content behind the link @danielsepulvedab provided to you? I know it is hard for beginners. But do not be surprised: Sometimes you find the answer to your question while you creating a good and reproducible minimal working example. You have to help us to help you. – buhtz Feb 08 '22 at 16:22
  • Hi, I just want to know how I can pass the dataframe which i have joined to the udf. That's alll. Am I doing it right? I created the function and now made it to UDF. I wanna apply the UDF to create more column but error its giving is can't serialize. – Sumit Kumar Feb 08 '22 at 16:26
  • By the way, I can't paste numbers here. They are not aligned. – Sumit Kumar Feb 08 '22 at 16:26
  • @wordinone pl help – Sumit Kumar Feb 08 '22 at 16:33

0 Answers0