Here you can find info of array functions in spark 2.4, and explode_outer is an explode that in an empty array, will produce a row with a 'null' value.
The idea is first to get for each moment, the array of messages that start, and the array of messages that end in each moment (start_of and end_of).
Then, we keep only the moments that a message starts or ends, and create and then do explodes to have a dataframe with 3 columns, one per each message start, and end. For a moment that m1 and m2 are created, will produce 2 start rows, for a moment that m1 starts and end, will produce 2 rows, with an m1 star, and m1 end.
And at the end, use a window function to group by 'message' and order by time, making sure that if a message starts and ends at the same moment (same time), the start will go first. Now we can guarantee that after each start, there will be a end row.
Mix them and you will have the start and end of each message.
A great exercise to think.
I've made the example in scala, but is should be easy to translate. Each line marked as showAndContinue, prints your example in that state to show what it does.
val w = Window.partitionBy().orderBy("time")
val w2 = Window.partitionBy("message").orderBy($"time", desc("start_of"))
df.select($"time", $"messages", lag($"messages", 1).over(w).as("pre"), lag("messages", -1).over(w).as("post"))
.withColumn("start_of", when($"pre".isNotNull, array_except(col("messages"), col("pre"))).otherwise($"messages"))
.withColumn("end_of", when($"post".isNotNull, array_except(col("messages"), col("post"))).otherwise($"messages"))
.filter(size($"start_of") + size($"end_of") > 0)
.showAndContinue
.select(explode(array(
struct($"time", $"start_of", array().as("end_of")),
struct($"time", array().as("start_of"), $"end_of")
)).as("elem"))
.select("elem.*")
.select($"time", explode_outer($"start_of").as("start_of"), $"end_of")
.select( $"time", $"start_of", explode_outer($"end_of").as("end_of"))
.filter($"start_of".isNotNull || $"end_of".isNotNull)
.showAndContinue
.withColumn("message", when($"start_of".isNotNull, $"start_of").otherwise($"end_of"))
.showAndContinue
.select($"message", when($"start_of".isNotNull, $"time").as("starts_at"), lag($"time", -1).over(w2).as("ends_at"))
.filter($"starts_at".isNotNull)
.showAndContinue
And the tables
+----+--------+--------+--------+--------+--------+
|time|messages| pre| post|start_of| end_of|
+----+--------+--------+--------+--------+--------+
| t01| [m1]| null|[m1, m2]| [m1]| []|
| t03|[m1, m2]| [m1]| [m2]| [m2]| [m1]|
| t04| [m2]|[m1, m2]| [m3]| []| [m2]|
| t06| [m3]| [m2]|[m3, m1]| [m3]| []|
| t07|[m3, m1]| [m3]| [m1]| [m1]| [m3]|
| t08| [m1]|[m3, m1]| [m2]| []| [m1]|
| t11| [m2]| [m1]|[m2, m4]| [m2]| []|
| t13|[m2, m4]| [m2]| [m2]| [m4]| [m4]|
| t15| [m2]|[m2, m4]| [m4]| []| [m2]|
| t20| [m4]| [m2]| []| [m4]| [m4]|
| t22|[m1, m4]| []| null|[m1, m4]|[m1, m4]|
+----+--------+--------+--------+--------+--------+
+----+--------+------+
|time|start_of|end_of|
+----+--------+------+
| t01| m1| null|
| t03| m2| null|
| t03| null| m1|
| t04| null| m2|
| t06| m3| null|
| t07| m1| null|
| t07| null| m3|
| t08| null| m1|
| t11| m2| null|
| t13| m4| null|
| t13| null| m4|
| t15| null| m2|
| t20| m4| null|
| t20| null| m4|
| t22| m1| null|
| t22| m4| null|
| t22| null| m1|
| t22| null| m4|
+----+--------+------+
+----+--------+------+-------+
|time|start_of|end_of|message|
+----+--------+------+-------+
| t01| m1| null| m1|
| t03| m2| null| m2|
| t03| null| m1| m1|
| t04| null| m2| m2|
| t06| m3| null| m3|
| t07| m1| null| m1|
| t07| null| m3| m3|
| t08| null| m1| m1|
| t11| m2| null| m2|
| t13| m4| null| m4|
| t13| null| m4| m4|
| t15| null| m2| m2|
| t20| m4| null| m4|
| t20| null| m4| m4|
| t22| m1| null| m1|
| t22| m4| null| m4|
| t22| null| m1| m1|
| t22| null| m4| m4|
+----+--------+------+-------+
+-------+---------+-------+
|message|starts_at|ends_at|
+-------+---------+-------+
| m1| t01| t03|
| m1| t07| t08|
| m1| t22| t22|
| m2| t03| t04|
| m2| t11| t15|
| m3| t06| t07|
| m4| t13| t13|
| m4| t20| t20|
| m4| t22| t22|
+-------+---------+-------+
It could be optimised extracting all the elements that start and end in the same moment, in the first table created, so they don't have to be "matched" the start and end again, but it depends if this is a common case, or just a small amount of cases.
It will be like this with the optimization (same windows)
val dfStartEndAndFiniteLife = df.select($"time", $"messages", lag($"messages", 1).over(w).as("pre"), lag("messages", -1).over(w).as("post"))
.withColumn("start_of", when($"pre".isNotNull, array_except(col("messages"), col("pre"))).otherwise($"messages"))
.withColumn("end_of", when($"post".isNotNull, array_except(col("messages"), col("post"))).otherwise($"messages"))
.filter(size($"start_of") + size($"end_of") > 0)
.withColumn("start_end_here", array_intersect($"start_of", $"end_of"))
.withColumn("start_of", array_except($"start_of", $"start_end_here"))
.withColumn("end_of", array_except($"end_of", $"start_end_here"))
.showAndContinue
val onlyStartEndSameMoment = dfStartEndAndFiniteLife.filter(size($"start_end_here") > 0)
.select(explode($"start_end_here"), $"time".as("starts_at"), $"time".as("ends_at"))
.showAndContinue
val startEndDifferentMoment = dfStartEndAndFiniteLife
.filter(size($"start_of") + size($"end_of") > 0)
.showAndContinue
.select(explode(array(
struct($"time", $"start_of", array().as("end_of")),
struct($"time", array().as("start_of"), $"end_of")
)).as("elem"))
.select("elem.*")
.select($"time", explode_outer($"start_of").as("start_of"), $"end_of")
.select( $"time", $"start_of", explode_outer($"end_of").as("end_of"))
.filter($"start_of".isNotNull || $"end_of".isNotNull)
.showAndContinue
.withColumn("message", when($"start_of".isNotNull, $"start_of").otherwise($"end_of"))
.showAndContinue
.select($"message", when($"start_of".isNotNull, $"time").as("starts_at"), lag($"time", -1).over(w2).as("ends_at"))
.filter($"starts_at".isNotNull)
.showAndContinue
val result = onlyStartEndSameMoment.union(startEndDifferentMoment)
result.orderBy("col", "starts_at").show()
And the tables
+----+--------+--------+--------+--------+------+--------------+
|time|messages| pre| post|start_of|end_of|start_end_here|
+----+--------+--------+--------+--------+------+--------------+
| t01| [m1]| null|[m1, m2]| [m1]| []| []|
| t03|[m1, m2]| [m1]| [m2]| [m2]| [m1]| []|
| t04| [m2]|[m1, m2]| [m3]| []| [m2]| []|
| t06| [m3]| [m2]|[m3, m1]| [m3]| []| []|
| t07|[m3, m1]| [m3]| [m1]| [m1]| [m3]| []|
| t08| [m1]|[m3, m1]| [m2]| []| [m1]| []|
| t11| [m2]| [m1]|[m2, m4]| [m2]| []| []|
| t13|[m2, m4]| [m2]| [m2]| []| []| [m4]|
| t15| [m2]|[m2, m4]| [m4]| []| [m2]| []|
| t20| [m4]| [m2]| []| []| []| [m4]|
| t22|[m1, m4]| []| null| []| []| [m1, m4]|
+----+--------+--------+--------+--------+------+--------------+
+---+---------+-------+
|col|starts_at|ends_at|
+---+---------+-------+
| m4| t13| t13|
| m4| t20| t20|
| m1| t22| t22|
| m4| t22| t22|
+---+---------+-------+
+----+--------+--------+--------+--------+------+--------------+
|time|messages| pre| post|start_of|end_of|start_end_here|
+----+--------+--------+--------+--------+------+--------------+
| t01| [m1]| null|[m1, m2]| [m1]| []| []|
| t03|[m1, m2]| [m1]| [m2]| [m2]| [m1]| []|
| t04| [m2]|[m1, m2]| [m3]| []| [m2]| []|
| t06| [m3]| [m2]|[m3, m1]| [m3]| []| []|
| t07|[m3, m1]| [m3]| [m1]| [m1]| [m3]| []|
| t08| [m1]|[m3, m1]| [m2]| []| [m1]| []|
| t11| [m2]| [m1]|[m2, m4]| [m2]| []| []|
| t15| [m2]|[m2, m4]| [m4]| []| [m2]| []|
+----+--------+--------+--------+--------+------+--------------+
+----+--------+------+
|time|start_of|end_of|
+----+--------+------+
| t01| m1| null|
| t03| m2| null|
| t03| null| m1|
| t04| null| m2|
| t06| m3| null|
| t07| m1| null|
| t07| null| m3|
| t08| null| m1|
| t11| m2| null|
| t15| null| m2|
+----+--------+------+
+----+--------+------+-------+
|time|start_of|end_of|message|
+----+--------+------+-------+
| t01| m1| null| m1|
| t03| m2| null| m2|
| t03| null| m1| m1|
| t04| null| m2| m2|
| t06| m3| null| m3|
| t07| m1| null| m1|
| t07| null| m3| m3|
| t08| null| m1| m1|
| t11| m2| null| m2|
| t15| null| m2| m2|
+----+--------+------+-------+
+-------+---------+-------+
|message|starts_at|ends_at|
+-------+---------+-------+
| m1| t01| t03|
| m1| t07| t08|
| m2| t03| t04|
| m2| t11| t15|
| m3| t06| t07|
+-------+---------+-------+
+---+---------+-------+
|col|starts_at|ends_at|
+---+---------+-------+
| m1| t01| t03|
| m1| t07| t08|
| m1| t22| t22|
| m2| t03| t04|
| m2| t11| t15|
| m3| t06| t07|
| m4| t13| t13|
| m4| t20| t20|
| m4| t22| t22|
+---+---------+-------+