0

I try to join 2 hive tables, omega and card, as follows:

table omega:

+------+--------+-------+-----+-----+
|pid   |enventid|card_id|count|name |
+------+--------+-------+-----+-----+
|111111|"sk"    |"pro"  |2    |"aaa"|
|222222|"sk"    |"pro"  |2    |"ddd"|
+------+--------+-------+-----+-----+

table card:

+-------+---------+
|card_id|card_desc|
+-------+---------+
|"pro"  |"1|2|3"  | 
+-------+---------+

then I defined a udf:

val getListUdf = udf((raw: String) => raw.split("|"))

now,i try to join 2 tables with the defined udf:

omega.join(card, Seq("card_id"), "left_outer").withColumn("card_desc", getListUdf(col("card_desc")))

but, I got these errors:

Caused by: java.lang.NullPointerException
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:25)
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$anonfun$1.apply(<console>:25)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:51)
at org.apache.spark.sql.execution.Project$$anonfun$1$$anonfun$apply$1.apply(basicOperators.scala:49)
......

How should i solve it? Who can help me? thanks

秦时明月
  • 35
  • 3
  • 8

2 Answers2

2

Apparently you feed nulls into your UDF which cause a nullpointer (call split on null). Try :

.withColumn("card_desc", 
            when(
              col("card_desc").isNotNull,
              getListUdf(col("card_desc"))
            )
        )
ernest_k
  • 44,416
  • 5
  • 53
  • 99
Raphael Roth
  • 26,751
  • 15
  • 88
  • 145
1

Since you are joining both dataframes using left-outer join, there would be null values in card_desc column for the rows in omega dataframe which doesn't have matching card_id in card dataframe. And when the udf function tries to split null values, you get nullPointerException.

I would recommend you to use the split inbuilt function which handles the null values as

omega.join(card, Seq("card_id"), "left_outer")
  .withColumn("card_desc", split(col("card_desc"), "\\|"))

split function does exactly the same as you are doing with udf function.

Or you can change your udf function as

val getListUdf = udf((raw: String) => raw match{
  case null => Array.empty[String]
  case _ => raw.split("\\|")
})
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • why is it "\\|"? – 秦时明月 Apr 01 '18 at 12:31
  • | (pipe) is a special character and needs to be escaped, thats why \\ is needed. Otherwise split will not work – Ramesh Maharjan Apr 01 '18 at 12:32
  • oh,i see.Thank you! – 秦时明月 Apr 01 '18 at 12:45
  • my pleasure @秦时明月 :) – Ramesh Maharjan Apr 01 '18 at 12:46
  • Building the logic into the UDF is a much better idea than the above accepted solution. – Robert Beatty Dec 21 '18 at 15:11
  • thanks for the comment @r0bb23 but inbuilt functions are better than udf functions as udf function would require you serialization and deserialization and are not jvm optimized. – Ramesh Maharjan Dec 21 '18 at 15:42
  • 1
    @ramesh that can be true. But you ignore the fact that there is a udf used in both answers. It also lifts performance concerns over all other concerns. Assuming the performance hit, in this case I don't think there would be much of one if any, isn't large then programmatically having it in the udf is better. It's easier to use and harder to misuse. You don't have to worry about new people to your code base using the udf without the null filtering logic. – Robert Beatty Dec 22 '18 at 16:11