2

I have an Apache Spark Dataframe of the following format

| ID |  groupId  | phaseName |
|----|-----------|-----------|
| 10 | someHash1 | PhaseA    |
| 11 | someHash1 | PhaseB    |
| 12 | someHash1 | PhaseB    |
| 13 | someHash2 | PhaseX    |
| 14 | someHash2 | PhaseY    |

Each row represents a phase that happens in a procedure that consists of several of these phases. The ID column represents a sequential order of phases and the groupId column shows which phases belong together.

I want to add a new column to the dataframe: previousPhaseName. This column should indicate the previous different phase from the same procedure. The first phase of a process (the one with the minimum ID) will have null as previous phase. When a phase occurs twice or more, the second (third...) occurrence will have the same previousPhaseName For example:

df = 
| ID |  groupId  | phaseName | prevPhaseName |
|----|-----------|-----------|---------------|
| 10 | someHash1 | PhaseA    | null          |
| 11 | someHash1 | PhaseB    | PhaseA        |
| 12 | someHash1 | PhaseB    | PhaseA        |
| 13 | someHash2 | PhaseX    | null          |
| 14 | someHash2 | PhaseY    | PhaseX        |

I am not sure how to implement this. My first approach would be:

  • create a second empty dataframe df2
  • for each row in df:
    find the row with groupId = row.groupId, ID < row.ID, and maximum id
  • add this row to df2
  • join df1 and df2

Partial Solution using Window Functions

I used Window Functionsto aggregate the Name of the previous phase, the number of previous occurrences (not necessarily in a row) of the current phase in the group and the information whether the current and previous phase names are equal:

WindowSpec windowSpecPrev = Window
  .partitionBy(df.col("groupId"))
  .orderBy(df.col("ID"));
WindowSpec windowSpecCount = Window
  .partitionBy(df.col("groupId"), df.col("phaseName"))
  .orderBy(df.col("ID"))
  .rowsBetween(Long.MIN_VALUE, 0);

df
  .withColumn("prevPhase", functions.lag("phaseName", 1).over(windowSpecPrev))
  .withColumn("phaseCount", functions.count("phaseId").over(windowSpecCount))
  .withColumn("prevSame", when(col("prevPhase").equalTo(col("phaseName")),1).otherwise(0))

df = 
| ID |  groupId  | phaseName | prevPhase   | phaseCount | prevSame |
|----|-----------|-----------|-------------|------------|----------|
| 10 | someHash1 | PhaseA    | null        |  1         |  0       |
| 11 | someHash1 | PhaseB    | PhaseA      |  1         |  0       |
| 12 | someHash1 | PhaseB    | PhaseB      |  2         |  1       |
| 13 | someHash2 | PhaseX    | null        |  1         |  0       |
| 14 | someHash2 | PhaseY    | PhaseX      |  1         |  0       |

This is still not what I wanted to achieve but good enough for now

Further Ideas

To get the the name of the previous distinct phase I see three possibilities that I have not investigated thoroughly:

  • Implement an own lagfunction that does not take an offset but recursively checks the previous line until it finds a value that is different from the given line. (Though I don't think it's possible to use own analytic window functions in Spark SQL)
  • Find a way to dynamically set the offset of the lag function according to the value of phaseCount. (That may fail if the previous occurrences of the same phaseName do not appear in a single sequence)
  • Use a UserDefinedAggregateFunction over the window that stores the ID and phaseName of the first given input and seeks for the highest ID with different phaseName.
moe
  • 1,716
  • 1
  • 14
  • 30

2 Answers2

5

I was able to solve this problem in the following way:

  1. Get the (ordinary) previous phase.
  2. Introduce a new id that groups phases that occur in sequential order. (With help of this answer). This takes two steps. First checking whether the current and previous phase names are equal and assigning a groupCount value accordingly. Second computing a cumulative sum over this value.
  3. Assign the previous phase of the first row of a sequential group to all its members.

Implementation

WindowSpec specGroup = Window.partitionBy(col("groupId"))  
                             .orderBy(col("ID"));
WindowSpec specSeqGroupId = Window.partitionBy(col("groupId")) 
                                  .orderBy(col("ID"))
                                  .rowsBetween(Long.MIN_VALUE, 0);
WindowSpec specPrevDiff = Window.partitionBy(col("groupId"), col("seqGroupId"))
                                .orderBy(col("ID"))
                                .rowsBetween(Long.MIN_VALUE, 0);

df.withColumn("prevPhase", coalesce(lag("phaseName", 1).over(specGroup), lit("NO_PREV"))) 
  .withColumn("seqCount", when(col("prevPhase").equalTo(col("phaseName")).or(col("prevPhase").equalTo("NO_PREV")),0).otherwise(1))
  .withColumn("seqGroupId", sum("seqCount").over(specSeqGroupId))
  .withColumn("prevDiff", first("prevPhase").over(specPrevDiff));

Result

df = 
| ID |  groupId  | phaseName | prevPhase | seqCount | seqGroupId | prevDiff |
|----|-----------|-----------|-----------|----------|------------|----------|
| 10 | someHash1 | PhaseA    | NO_PREV   |  0       |  0         | NO_PREV  |
| 11 | someHash1 | PhaseB    | PhaseA    |  1       |  1         | PhaseA   |
| 12 | someHash1 | PhaseB    | PhaseA    |  0       |  1         | PhaseA   |
| 13 | someHash2 | PhaseX    | NO_PREV   |  0       |  0         | NO_PREV  |
| 14 | someHash2 | PhaseY    | PhaseX    |  1       |  1         | PhaseX   |

Any suggestions, specially in terms of efficiency of these operations are appreciated.

moe
  • 1,716
  • 1
  • 14
  • 30
  • I could not find another way to approach this, did you ever find a better way? thanks btw, very clever on getting the FIRST(prev..) of whenever the change happens – dim_user Mar 03 '20 at 10:10
  • unfotunately I did not look further into it, cool that it helped you though – moe Mar 03 '20 at 14:26
1

I guess you can use Spark window (row frame) functions. Check the api documentation and the following post.

https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html

takaomag
  • 1,545
  • 1
  • 16
  • 26
  • Thanks for the advice. I haven't solved the problem yet but it definitely pushed me in the right direction. – moe Sep 29 '17 at 08:11