2

I am working with Spark dataframes and want to update a column column_to_be_updated in a hive-table using spark-sql in Scala.

My code so far does work with smaller dataframes:

var data_frame = spark.sql("Select ... From TableXX")

var id_list = spark.sql("Select Id From TableXY Where ...")..collect().map(_(0)).toList

data_frame.withColumn("column_to_be_updated", when($"other_column_of_frame".isin(id_list:_*), 1)
    .otherwise($"column_to_be_updated"))

What I want is to updated the column column_to_be_updated if the entry in other_column-of_frame is in the id-column of TableXY. My workaround is to cast the id-column first to a list and then use the .isin-statement.

However, I have a lot of rows in TableXY and TableXX so it seems to crash and overload the id_list. Is there any other workaround or more efficient solution for what I am trying to achieve?

Thanks in advance!

Shaido
  • 27,497
  • 23
  • 70
  • 73
dnks23
  • 359
  • 6
  • 22

1 Answers1

2

You can join the dataframes using an outer left join. In this way, the Id column can be added to data_frame on the rows where the other_column_of_frame is in the list of ids. Then, simply check whether the newly added Id column is null or not.

val ids = spark.sql("Select Id From TableXY Where ...")
val updated = data_frame
  .join(broadcast(ids), ids.col("Id") === data_frame.col("other_column_of_frame"), "left_outer")
  .withColumn("column_to_be_updated", when($"Id".isNotNull, 1).otherwise($"column_to_be_updated"))
  .drop("Id")

You can read about the broadcast here: DataFrame join optimization - Broadcast Hash Join

Shaido
  • 27,497
  • 23
  • 70
  • 73
  • I don't think this is what I want to achieve. I want to update a column to value `1` if the id is in the list as stated in my initial post. – dnks23 May 08 '19 at 09:50
  • @dnks23: You are correct, I updated the answer to take that into consideration. – Shaido May 08 '19 at 10:03