2

I am using Spark and Scala for learning purpose. I came around a situation wherein I need to compare the validity of records present in one of the columns of spark dataframe. This is how I created one dataframe, "dataframe1":

import sparkSession.implicits._
val dataframe1 = Seq("AB","BC","CD","DA","AB","BC").toDF("col1")

dataframe1:

+----+
|col1|
+----+
|  AB|
|  BC|
|  CD|
|  DA|
|  AB|
|  BC|
+----+

The validity of records depends on the condition if the record is "AB" or "BC". Here is my first attempt:

val dataframe2 = dataframe1.withColumn("col2", when('col1.contains("AB") or 'col1.contains("BC"), "valid").otherwise("invalid"))

dataframe2:

+----+-------+
|col1|   col2|
+----+-------+
|  AB|  valid|
|  BC|  valid|
|  CD|invalid|
|  DA|invalid|
|  AB|  valid|
|  BC|  valid|
+----+-------+

But I don't think this is a good way of doing because if I need to add more valid records then I need to add conditions in "when" clause which will increase the code length and disturbs the code readability.

So I tried to put all the valid records in one list and check if the record string is present in the list. If it is present then it is a valid record otherwise not. Here is the code snippet for this trial:

val validRecList = Seq("AB", "BC").toList
val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))

But somehow it is not working as expected, as the result of this is:

+----+-------+
|col1|   col2|
+----+-------+
|  AB|invalid|
|  BC|invalid|
|  CD|invalid|
|  DA|invalid|
|  AB|invalid|
|  BC|invalid|
+----+-------+

Can anybody tell me what mistake am I doing here? And, any other generic suggestion for such a scenario. Thank you.

Jitesh Sharma
  • 65
  • 1
  • 7

2 Answers2

0

Try this:

import spark.implicits._
import org.apache.spark.sql.functions._

val dataframe1 = Seq("AB","BC","CD","DA","AB","BC", "XX").toDF("col1").as[(String)]
val validRecList = List("AB", "BC") 

val dataframe2 = dataframe1.withColumn("col2", when($"col1".isin(validRecList: _*), lit("valid")).otherwise (lit("invalid")))
dataframe2.show(false)

returns:

+----+-------+
|col1|col2   |
+----+-------+
|AB  |valid  |
|BC  |valid  |
|CD  |invalid|
|DA  |invalid|
|AB  |valid  |
|BC  |valid  |
|XX  |invalid|
+----+-------+
thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • Thank you @thebluephantom. I tried running this approach previously. There is no syntax error if i write it like : val dataframe3 = dataframe1.withColumn("col2", when('col1.isin(validRecList), "valid").otherwise("invalid")) but getting an error on execution. Error stack is : – Jitesh Sharma Oct 28 '18 at 10:52
  • Runs fine with me - use this approach all the time. Cannot see your error stack. – thebluephantom Oct 28 '18 at 10:59
  • Can you please explain this expression in when condition, `$"col1".isin(validRecList: _*)' and how it differs from `$"col1".isin(validRecList)`? – Jitesh Sharma Oct 28 '18 at 11:00
  • Error stack is: Exception in thread "main" java.lang.RuntimeException: Unsupported literal type class scala.collection.immutable.$colon$colon List(AB, BC) at org.apache.spark.sql.catalyst.expressions.Literal$.apply(literals.scala:77) at org.apache.spark.sql.catalyst.expressions.Literal$$anonfun$create$2.apply(literals.scala:163) I am not able to give complete stack in comments. :| – Jitesh Sharma Oct 28 '18 at 11:03
  • Just the way it is is. _* means: See https://stackoverflow.com/questions/46398016/notation-in-scala What platform you running? Ran it again, fine. – thebluephantom Oct 28 '18 at 11:11
  • Yes, I'm able to execute it with your solution. – Jitesh Sharma Oct 31 '18 at 02:47
0

dataframe3 code is not working because when we see the documentation about "withColumn" function on Dataset https://spark.apache.org/docs/2.2.0/api/scala/index.html#org.apache.spark.sql.Dataset

We'd see the withColumn receive "String" and "Column" as the parameter type.

So this code

val dataframe3 = dataframe1.withColumn("col2", if(validRecList.contains('col1.toString())) lit("valid") else lit("invalid"))

will give col2 as the new column name, but will give lit("valid") or lit("invalid") as the Column name. The if(validRecList.contains('col1.toString) lit("valid") else lit("invalid") will be executed as scala code not executed as the Dataset operation nor the Column operation.

I mean this if(validRecList.contains('col1.toString) is executed by scala not spark because the "invalid" result is derived from validRecList is not have 'col1 on the List. But when you define val validRecList = Seq('col1, "AB", "BC") the validRecList.contains('col1) will return true

Also, IF operator is not supported on Dataset and on Column

If you want a condition on withColumn function, you need to express the Column type expression like this:

dataframe3.withColumn("isContainRecList", $"col1".isin(validRecList: _*))

this $"col1".isin(validRecList: _*) is a Column type expression because it will return Column (based on the documentation) or you can use when(the_condition, value_if_true, value_if_false).

So, I think it is important to understand the types that the spark engine will work with our data, if we are not give the Column type expression, it will not refer to the 'col1 data but it will refer to 'col1 as a scala symbol.

Also, when you want to use IF, maybe you could create a User Defined Functions.

import org.apache.spark.sql.functions.udf
def checkValidRecList(needle: String): String = if(validRecList.contains(needle)) "valid" else "invalid"

val checkUdf = udf[String, String](checkValidRecList)

val dataframe3 = dataframe1.withColumn("col2", checkUdf('col1))

the result is:

scala> dataframe3.show(false)

+----+-------+
|col1|col2   |
+----+-------+
|AB  |valid  |
|BC  |valid  |
|CD  |invalid|
|DA  |invalid|
|AB  |valid  |
|BC  |valid  |
+----+-------+

But, I think we should use remember this UDF stuff is not always recommended.

ByanJati
  • 83
  • 1
  • 11