1

I am working on a spark dataframe. Input dataframe looks like below (Table 1). I need to write a logic to get the keywords with maximum length for each session ids. There are multiple keywords that would be part of output for each sessionid. expected output looks like Table 2.

Input dataframe:

(Table 1)
|-----------+------------+-----------------------------------|
| session_id| value      |  Timestamp                        |
|-----------+------------+-----------------------------------|
|     1     | cat        | 2021-01-11T13:48:54.2514887-05:00 |
|     1     | catc       | 2021-01-11T13:48:54.3514887-05:00 |
|     1     | catch      | 2021-01-11T13:48:54.4514887-05:00 |
|     1     | par        | 2021-01-11T13:48:55.2514887-05:00 |
|     1     | part       | 2021-01-11T13:48:56.5514887-05:00 |
|     1     | party      | 2021-01-11T13:48:57.7514887-05:00 |
|     1     | partyy     | 2021-01-11T13:48:58.7514887-05:00 |
|     2     | fal        | 2021-01-11T13:49:54.2514887-05:00 |
|     2     | fall       | 2021-01-11T13:49:54.3514887-05:00 |
|     2     | falle      | 2021-01-11T13:49:54.4514887-05:00 |
|     2     | fallen     | 2021-01-11T13:49:54.8514887-05:00 |
|     2     | Tem        | 2021-01-11T13:49:56.5514887-05:00 |
|     2     | Temp       | 2021-01-11T13:49:56.7514887-05:00 |
|-----------+------------+-----------------------------------|

Expected Output:

 (Table 2)
|-----------+------------+
| session_id| value      |
|-----------+------------+
|     1     | catch      |
|     1     | partyy     |
|     2     | fallen     |
|     2     | Temp       |
|-----------+------------|

Solution I tried:

I added another column called col_length which captures the length of each word in value column. later on tried to compare each row with its subsequent row to see if it is of maximum lenth. But this solution only works party.

val df = spark.read.parquet("/project/project_name/abc")

val dfM = df.select($"session_id",$"value",$"Timestamp").withColumn("col_length",length($"value"))

val ts = Window
        .orderBy("session_id")
        .rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = dfM
            .withColumn("running_max", max("col_length") over ts)
            .where($"running_max" === $"col_length")
            .select("session_id", "value", "Timestamp")

Current Output:

|-----------+------------+
| session_id| value      |
|-----------+------------+
|     1     | catch      |
|     2     | fallen     |
|-----------+------------|

Multiple columns does not work inside an orderBy clause with window function so I didn't get desired output.I got 1 output per sesison id. Any suggesions would be highly appreciated. Thanks in advance.

Gabio
  • 9,126
  • 3
  • 12
  • 32
Abhi Sinha
  • 13
  • 4
  • Should (2, Temp) be part of the expected output? For session_id `2` the longest keyword is only `fallen`. – Gabio Mar 21 '22 at 08:01
  • @Gabio- Yes it should be. Not looking for longest keyword per session_id here. If length of the word keeps on increasing then the one with the highest character count should be selected in that series. Once the length of the word start to decrease then the new series starts. If there would have been an extra line for session id 1 in the table1 above then both (1,catch) and (1,partyy) will be the output for session id 1 1 partyy 2021-01-11T13:52:56.7514887-05:00 – Abhi Sinha Mar 21 '22 at 18:00
  • @Gabio- I modified the original question to make it clearer – Abhi Sinha Mar 21 '22 at 18:03

1 Answers1

2

You can solve it by using lead function:

val windowSpec = Window.orderBy("session_id")
dfM
  .withColumn("lead",lead("value",1).over(windowSpec))
  .filter((functions.length(col("lead")) < functions.length(col("value"))) || col("lead").isNull)
  .drop("lead")
  .show
Gabio
  • 9,126
  • 3
  • 12
  • 32