1

I have an rdd that looks like this:

timestamp,user_id,search_id        
[2021-08-14 14:38:31,user_a,null]
[2021-08-14 14:42:01,user_a,ABC]
[2021-08-14 14:55:12,user_a,null]
[2021-08-14 14:56:19,user_a,null] 
[2021-08-14 15:01:36,user_a,null]
[2021-08-14 15:02:22,user_a,null]
[2021-08-15 07:38:07,user_b,XYZ] 
[2021-08-15 07:39:59,user_b,null]    

I would like to associate the events that do not have a search_id with previous search_ids by filling the null values in "search_id" with the latest non null value (when there is one) grouped by user_id.

Therefore, my output would look like this:

timestamp,user_id,search_id        
[2021-08-14 14:38:31,user_a,null]
[2021-08-14 14:42:01,user_a,ABC]
[2021-08-14 14:55:12,user_a,ABC]
[2021-08-14 14:56:19,user_a,ABC] 
[2021-08-14 15:01:36,user_a,ABC]
[2021-08-14 15:02:22,user_a,ABC]
[2021-08-15 07:38:07,user_b,XYZ] 
[2021-08-15 07:39:59,user_b,XYZ]    

I found a solution for spark dataframes that used org.apache.spark.sql.functions.last and a window here --> Spark Window function last not null value but my context doesn't allow me to convert the rdd to a dataframe at the moment so I was wondering if any of you had an idea of how this could be done.

justino666
  • 11
  • 1

2 Answers2

0

I guess groupBy user (https://spark.apache.org/docs/1.6.2/api/java/org/apache/spark/rdd/RDD.html#groupBy(scala.Function1,%20scala.reflect.ClassTag) ) and then flatMapWith (don't forget to sort grouped items, because groupBy doesn't preserve order) which will fix your search ids. All this presuming you don't have too many items per user

0

One way to get this done is by knowing the max times that we need to call the lag() function. Try this.

Input:

val df1=spark.sql("""
select timestamp'2021-08-14 14:38:31' timestamp, 'user_a' user_id, 'null' search_id union all 
select '2021-08-14 14:42:01' , 'user_a', 'ABC'  union all 
select '2021-08-14 14:55:12' , 'user_a', 'null'  union all 
select '2021-08-14 14:56:19' , 'user_a', 'null'   union all 
select '2021-08-14 15:01:36' , 'user_a', 'null'  union all 
select '2021-08-14 15:02:22' , 'user_a', 'null'  union all 
select '2021-08-15 07:38:07' , 'user_b', 'XYZ'   union all 
select '2021-08-15 07:39:59' , 'user_b', 'null'  
""")
df1.orderBy("timestamp").show(false)
df1.printSchema
df1.createOrReplaceTempView("df1")

+-------------------+-------+---------+
|timestamp          |user_id|search_id|
+-------------------+-------+---------+
|2021-08-14 14:38:31|user_a |null     |
|2021-08-14 14:42:01|user_a |ABC      |
|2021-08-14 14:55:12|user_a |null     |
|2021-08-14 14:56:19|user_a |null     |
|2021-08-14 15:01:36|user_a |null     |
|2021-08-14 15:02:22|user_a |null     |
|2021-08-15 07:38:07|user_b |XYZ      |
|2021-08-15 07:39:59|user_b |null     |
+-------------------+-------+---------+

Now calculate the max times

val max_count = spark.sql(" select max(c) from (select count(*) c from df1 group by user_id)").as[Long].first
max_count: Long = 6

Create a mutable dataframe, so that we can loop around and assign it to the same df.

var df2=df1

for( i <- 1 to max_count.toInt )
{
    df2=df2.withColumn("search_id",expr(""" case when search_id <> 'null' then search_id 
                         else lag(search_id) over(partition by user_id order by timestamp) end """))
}
df2.orderBy("timestamp").show(false)

+-------------------+-------+---------+
|timestamp          |user_id|search_id|
+-------------------+-------+---------+
|2021-08-14 14:38:31|user_a |null     |
|2021-08-14 14:42:01|user_a |ABC      |
|2021-08-14 14:55:12|user_a |ABC      |
|2021-08-14 14:56:19|user_a |ABC      |
|2021-08-14 15:01:36|user_a |ABC      |
|2021-08-14 15:02:22|user_a |ABC      |
|2021-08-15 07:38:07|user_b |XYZ      |
|2021-08-15 07:39:59|user_b |XYZ      |
+-------------------+-------+---------+
stack0114106
  • 8,534
  • 3
  • 13
  • 38