-1

I may want to asking help about replacing the negative value from different between the timestamp to zero. Running on python3 on spark. Here is my code:

code:

timeFmt = "yyyy-MM-dd HH:mm:ss"
time_diff_1 = when((col("time1").isNotNull()) &
                       (col("time2").isNotNull()),                      
                       (unix_timestamp('time2', format=timeFmt) - unix_timestamp('time1', format=timeFmt)) / 60
                      ).otherwise(lit(0))

time_diff_2 = when((col("time2").isNotNull()) & 
                       (col("time3").isNotNull()),                       
                       (unix_timestamp('time3', format=timeFmt) - unix_timestamp('time2', format=timeFmt)) / 60
                      ).otherwise(lit(0))

time_diff_3 = when((col("time3").isNotNull()) &                           
                       (col("time4").isNotNull()),                       
                       (unix_timestamp('time4', format=timeFmt) - unix_timestamp('time3', format=timeFmt)) / 60
                      ).otherwise(lit(0))

df = (df      
      .withColumn('time_diff_1', time_diff_1)      
      .withColumn('time_diff_2', time_diff_2)
      .withColumn('time_diff_3', time_diff_3)
     )


df = (df
      .withColumn('time_diff_1', when(col('time_diff_1') < 0, 0).otherwise(col('time_diff_1')))
      .withColumn('time_diff_2', when(col('time_diff_2') < 0, 0).otherwise(col('time_diff_2')))
      .withColumn('time_diff_3', when(col('time_diff_3') < 0, 0).otherwise(col('time_diff_3')))
     )

when I run the above code, I get error. Here is the error:

Py4JJavaError: An error occurred while calling o1083.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 56.0 failed 4 times, most recent failure: Lost task 0.3 in stage 56.0 (TID 7246, fxhclxcdh8.dftz.local, executor 21): org.codehaus.janino.JaninoRuntimeException: failed to compile: org.codehaus.janino.JaninoRuntimeException: Code of method "apply_9$(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass$SpecificUnsafeProjection;Lorg/apache/spark/sql/catalyst/InternalRow;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB /* 001 / public java.lang.Object generate(Object[] references) { / 002 / return new SpecificUnsafeProjection(references); / 003 / } / 004 / / 005 / class SpecificUnsafeProjection extends org.apache.spark.sql.catalyst.expressions.UnsafeProjection { / 006 / / 007 / private Object[] references; / 008 / private boolean evalExprIsNull; / 009 / private boolean evalExprValue; / 010 /
private boolean evalExpr1IsNull; /
011 / private boolean evalExpr1Value; / 012 / private java.text.DateFormat formatter5; / 013 / private java.text.DateFormat formatter8; / 014 /
private java.text.DateFormat formatter12; /
015 / private java.text.DateFormat formatter13; / 016 / private UTF8String.IntWrapper wrapper; / 017 / private java.text.DateFormat formatter15; / 018 / private java.text.DateFormat formatter18; / 019 / private java.text.DateFormat formatter19; / 020 / private java.text.DateFormat formatter23; / 021 / private java.text.DateFormat formatter26; / 022 / private java.text.DateFormat formatter27; / 023 / private java.text.DateFormat formatter30; / 024 */ private java.text.DateFormat formatter32; ........

Anybody can help?

desertnaut
  • 57,590
  • 26
  • 140
  • 166
Jimmy
  • 13
  • 1
  • 4
  • 1
    Please provide a small [reproducible example](https://stackoverflow.com/questions/48427185/how-to-make-good-reproducible-apache-spark-dataframe-examples). – pault Oct 24 '18 at 14:43

1 Answers1

0

I think the easier way is just to write a simple UDF(User Defined Function) and apply it on the column you want. here is a sample code to do this:

import pyspark.sql.functions as f

correctNegativeDiff = f.udf(lambda diff: 0.0 if diff < 0.0 else diff, DoubleType())

df = df.withColumn('time_diff_1', correctNegativeDiff(df.time_diff_1))\
       .withColumn('time_diff_2', correctNegativeDiff(df.time_diff_2))\
       .withColumn('time_diff_3', correctNegativeDiff(df.time_diff_3))
Ali AzG
  • 1,861
  • 2
  • 18
  • 28
  • Thank you, this code have helped me solve the problem, just have a small issue, it not return 0 but null. – Jimmy Oct 27 '18 at 08:34
  • You're welcome, I think it's because of LongType(). change it to IntegerType() or FloatType() may solve your issue! – Ali AzG Oct 27 '18 at 08:47
  • I have change to double type as my value is in double type, but still get null instead of 0. Thank you. – Jimmy Oct 27 '18 at 08:52
  • change it like this may solve that: `f.udf(lambda diff: 0 if diff <= 0 else diff, DoubleType())` – Ali AzG Oct 27 '18 at 09:12
  • If you are using DoubleType, you should also feed double types. Changing the udf as follows should get rid of the null in your case: `f.udf(lambda diff: 0.0 if diff < 0.0 else diff, DoubleType())` – elyptikus Jun 25 '21 at 07:22