2

I have two dataframe

df1 :

    +---------------+-------------------+-----+------------------------+------------------------+---------+
|id             |dt                 |speed|stats                   |lag_stat                |lag_speed|
+---------------+-------------------+-----+------------------------+------------------------+---------+
|358899055773504|2018-07-31 18:38:36|0    |[9, -1, -1, 13, 0, 1, 0]|null                    |null     |
|358899055773504|2018-07-31 18:58:34|0    |[9, 0, -1, 22, 0, 1, 0] |[9, -1, -1, 13, 0, 1, 0]|0        |
|358899055773505|2018-07-31 18:54:23|4    |[9, 0, 0, 22, 1, 1, 1]  |null                    |null     |
+---------------+-------------------+-----+------------------------+------------------------+---------+

df2 :

+---------------+-------------------+-----+------------------------+
|id             |dt                 |speed|stats                   |
+---------------+-------------------+-----+------------------------+
|358899055773504|2018-07-31 18:38:34|0    |[9, -1, -1, 13, 0, 1, 0]|
|358899055773505|2018-07-31 18:48:23|4    |[8, -1, 0, 22, 1, 1, 1] |
+---------------+-------------------+-----+------------------------+

I want to replace the null value in column lag_stat,speed in df1 with the value of stat and speed from dataframe df2 wrt to the same id.

Desired output looks like this:

  +---------------+-------------------+-----+--------------------+--------------------+---------+
    |             id|                 dt|speed|               stats|            lag_stat|lag_speed|
    +---------------+-------------------+-----+--------------------+--------------------+---------+
    |358899055773504|2018-07-31 18:38:36|   0|[9, -1, -1, 13, 0, 1,0]|[9, -1, -1, 13, 0, 1, 0]|  0|
    |358899055773504|2018-07-31 18:58:34|   0|[9, 0, -1, 22, 0, 1, 0]|[9, -1, -1, 13, 0, 1, 0]|  0|
    |358899055773505|2018-07-31 18:54:23|   4|[9, 0, 0, 22, 1, 1, 1]|[8, -1, 0, 22, 1, 1, 1] | 4 |
    +---------------+-------------------+-----+--------------------+--------------------+---------+
experiment
  • 315
  • 3
  • 19

1 Answers1

5

One possible way could be join the DFs and then apply some when functions on that columns.

For example, this:

val output = df1.join(df2, df1.col("id")===df2.col("id"))
      .select(df1.col("id"),
              df1.col("dt"),
              df1.col("speed"),
              df1.col("stats"),
              when(df1.col("lag_stat").isNull,df2.col("stats")).otherwise(df1.col("lag_stat")).alias("lag_stats"),
              when(df1.col("lag_speed").isNull,df2.col("speed")).otherwise(df1.col("lag_speed")).alias("lag_speed")
      )

will give you the expected output:

+---------------+------------------+-----+------------------+------------------+---------+
|             id|                dt|speed|             stats|         lag_stats|lag_speed|
+---------------+------------------+-----+------------------+------------------+---------+
|358899055773504|2018-07-3118:38:36|    0|[9,-1,-1,13,0,1,0]|[9,-1,-1,13,0,1,0]|        0|
|358899055773504|2018-07-3118:58:34|    0| [9,0,-1,22,0,1,0]|[9,-1,-1,13,0,1,0]|        0|
|358899055773505|2018-07-3118:54:23|    4|  [9,0,0,22,1,1,1]| [8,-1,0,22,1,1,1]|        4|
+---------------+------------------+-----+------------------+------------------+---------+
pheeleeppoo
  • 1,491
  • 6
  • 25
  • 29
  • Thanks for your response but how can we do it without using joins.Isn't join a costly operation? – experiment Sep 14 '18 at 18:40
  • 1
    @experiment You can't perform any operations like this on two separate dataframe without joining them first. – Shaido Sep 15 '18 at 02:21
  • @Shaido hi sir, I have one question not related to above question,I am doing some kind of processing in spark and want to implement a functionality that regardless of the processing which is running I want to schedule a timer(at an interval of 5 minutes) which will persist some data into cassandra( or let`s say any other source). How can I do so? Any kind of insight will be helpful :) – experiment Oct 05 '18 at 05:00
  • @experiment: It sounds like you want to use streaming data, you can take a look at structured streaming and use cassandra as sink: https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#quick-example – Shaido Oct 05 '18 at 06:01
  • @Shaido I want to implement a timer thread,for me it can sound like two task running in parallel,one is keeping track of 5min interval and other is doing all the proccesing which I have told it to do,its like I am doing processing on the streaming data and then I have cached the output of that processing in spark as temp table and that this cached table is used somewhere again in spark script but only after some interval I want to persist in cassandra. I am unable to take the idea from the link you shared ,should I post a question so that you can provide a written solution? – experiment Oct 05 '18 at 06:20
  • @experiment: I don't think there is any way to use mutliple threads in this way with Spark, but I could be wrong (you could create a question regarding this to see what others say). If you use streaming data you could create a cassandra sink to save at regular intrevals: https://stackoverflow.com/questions/50037285/writing-spark-structure-streaming-data-into-cassandra – Shaido Oct 05 '18 at 07:04