I need to unexplode a column in dataframe pyspark with sequence number conditional. E.g
Input dataframe
Expect output dataframe
You can see when c1 = 1 at a row, that row will break content of c4 column into new row (because length over limit). Otherwise if when c1 = 0 then c4 contain full content, no need break into new row. c4 column can break it into multi row next
This same pyspark.sql.functions.explode(col) in pyspark, and i need to unexplode but i have a conditional is c1 column (it's not simple such as group by then collect list df.groupby().agg(F.collect_list()), because c1 is sequence conditional)
I try to use window function flow by this topic PySpark - Append previous and next row to current row. But how can i solve when c4 col break multi row next
Sample code
from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()
df_in = spark_session.createDataFrame(
[
(1, 'a', 'b', 'c1', 'd'),
(0, 'a', 'b', 'c2', 'd'),
(0, 'e', 'f', 'g', 'h'),
(0, '1', '2', '3', '4'),
(1, 'x', 'y', 'z1', 'k'),
(1, 'x', 'y', 'z2', 'k'),
(1, 'x', 'y', 'z3', 'k'),
(0, 'x', 'y', 'z4', 'k'),
(1, '6', '7', '81', '9'),
(0, '6', '7', '82', '9'),
],
['c1', 'c2', 'c3', 'c4', 'c5']
)
df_out = spark_session.createDataFrame(
[
('a', 'b', 'c1-c2', 'd'),
('e', 'f', 'g', 'h'),
('1', '2', '3', '4'),
('x', 'y', 'z1-z2-z3-z4', 'k'),
('6', '7', '81-82', '9')
],
['c2', 'c3', 'c4', 'c5']
)
df_in.show()
df_out.show()
How can i solve that. Thank you
UPDATED input
df_in = spark_session.createDataFrame(
[
('0', 1, 'a', 'b', 'c1', 'd'),
('0', 0, 'a', 'b', 'c2', 'd'),
('0', 0, 'e', 'f', 'g', 'h'),
('0', 0, '1', '2', '3', '4'),
('0', 1, 'x', 'y', 'sele', 'k'),
('0', 1, 'x', 'y', 'ct ', 'k'),
('0', 1, 'x', 'y', 'from', 'k'),
('0', 0, 'x', 'y', 'a', 'k'),
('0', 1, '6', '7', '81', '9'),
('0', 0, '6', '7', '82', '9'),
],
['c0', 'c1', 'c2', 'c3', 'c4', 'c5']
)
output
Expect output
x| y|select -from-a| k