1

I need to unexplode a column in dataframe pyspark with sequence number conditional. E.g

Input dataframe

enter image description here

Expect output dataframe

enter image description here

You can see when c1 = 1 at a row, that row will break content of c4 column into new row (because length over limit). Otherwise if when c1 = 0 then c4 contain full content, no need break into new row. c4 column can break it into multi row next

This same pyspark.sql.functions.explode(col) in pyspark, and i need to unexplode but i have a conditional is c1 column (it's not simple such as group by then collect list df.groupby().agg(F.collect_list()), because c1 is sequence conditional)

I try to use window function flow by this topic PySpark - Append previous and next row to current row. But how can i solve when c4 col break multi row next

Sample code

from pyspark.sql import SparkSession
spark_session = SparkSession.builder.getOrCreate()

df_in = spark_session.createDataFrame(
    [
      (1, 'a', 'b', 'c1', 'd'),
      (0, 'a', 'b', 'c2', 'd'),
      (0, 'e', 'f', 'g', 'h'),
      (0, '1', '2', '3', '4'),
      (1, 'x', 'y', 'z1', 'k'),
      (1, 'x', 'y', 'z2', 'k'),
      (1, 'x', 'y', 'z3', 'k'),
      (0, 'x', 'y', 'z4', 'k'),
      (1, '6', '7', '81', '9'),
      (0, '6', '7', '82', '9'),
    ],
    ['c1', 'c2', 'c3', 'c4', 'c5']
)

df_out = spark_session.createDataFrame(
    [
      ('a', 'b', 'c1-c2', 'd'),
      ('e', 'f', 'g', 'h'),
      ('1', '2', '3', '4'),
      ('x', 'y', 'z1-z2-z3-z4', 'k'), 
      ('6', '7', '81-82', '9')
    ],
    ['c2', 'c3', 'c4', 'c5']
)

df_in.show()
df_out.show()

How can i solve that. Thank you


UPDATED input

df_in = spark_session.createDataFrame(
    [
      ('0', 1, 'a', 'b', 'c1', 'd'),
      ('0', 0, 'a', 'b', 'c2', 'd'),
      ('0', 0, 'e', 'f', 'g', 'h'),
      ('0', 0, '1', '2', '3', '4'),
      ('0', 1, 'x', 'y', 'sele', 'k'),
      ('0', 1, 'x', 'y', 'ct ', 'k'),
      ('0', 1, 'x', 'y', 'from', 'k'),
      ('0', 0, 'x', 'y', 'a', 'k'),
      ('0', 1, '6', '7', '81', '9'),
      ('0', 0, '6', '7', '82', '9'),
    ],
    ['c0', 'c1', 'c2', 'c3', 'c4', 'c5']
)

output

enter image description here

Expect output

x| y|select -from-a| k

qxk71551
  • 95
  • 9
  • I dont quite understand the effect of the column `c1`. How would the expected output change if for example the x-y- rows contained a `0` instead of the `1`s in `c1`? – werner Apr 17 '21 at 13:25
  • @werner c1 alway is 1 or 0, c1 is a flag let you know current row at c4 col is fully or cutted into next row (because length of c4 col over limit), for example: c4 is nvarchar2(4000), if content of c4 = 40001 then last letter will store in next row – qxk71551 Apr 17 '21 at 13:43
  • So the last break with have 0 and not 1? – anky Apr 17 '21 at 14:20
  • @anky yes right – qxk71551 Apr 17 '21 at 14:22
  • @anky or at begin, it no need to break, c1 also = 0 – qxk71551 Apr 17 '21 at 14:23
  • @anky Thank for your solution, but my dataset has many partition, so after i try your code, it's not working, i have to repartition = 1 and i don't want this. Thank you so much – qxk71551 Apr 17 '21 at 16:44
  • @qxk71551 NP, My solution was wrong anyway :) – anky Apr 17 '21 at 16:45
  • @anky i re-tested, your solution work well (in my situation), because when i read data into df, it has multi partition (i don't repartition) and order available. But when i repartition (as ggordon comment to test), order of row is change so your solution it'snt work. But i no need change it – qxk71551 Apr 17 '21 at 19:42
  • @anky your solution shot more than ggordon, but after i refactoring, i think i only need group by all col except c1 and c4 then collect c4, because after i group by, it unique, but i have to group by many col (>60 col) – qxk71551 Apr 17 '21 at 19:48

1 Answers1

2

This solution works even when your data set is in multiple partitions and not ordered.

from pyspark.sql.window import Window
from pyspark.sql import functions as F
orderByColumns = [F.col('c4'),F.col('c1').cast('int').desc()]
partitionColumns =[ F.col(column) for column in ['c2','c3','c5']]
df_in.orderBy(orderByColumns)\
     .withColumn('ranked',F.dense_rank().over(Window.partitionBy(partitionColumns).orderBy(orderByColumns)))\
     .withColumn('c4-ranked',F.concat(F.col('ranked'),F.lit('='),F.col('c4')))\
     .groupBy(partitionColumns)\
     .agg(F.collect_list('c4-ranked').alias('c4'))\
     .select(
         F.col('c2'),
         F.col('c3'),
         F.regexp_replace(F.array_join(F.col('c4'),"-"),"\d+=","").alias('c4'),
         F.col('c5')
     )\
     .show()

+---+---+-----------+---+
| c2| c3|         c4| c5|
+---+---+-----------+---+
|  1|  2|          3|  4|
|  x|  y|z1-z2-z3-z4|  k|
|  e|  f|          g|  h|
|  6|  7|      81-82|  9|
|  a|  b|      c1-c2|  d|
+---+---+-----------+---+

Setup

df_in = sparkSession.createDataFrame(
    [
      (1, 'a', 'b', 'c1', 'd'),
      (0, 'a', 'b', 'c2', 'd'),
      (0, 'e', 'f', 'g', 'h'),
      (0, '1', '2', '3', '4'),
      (1, 'x', 'y', 'z1', 'k'),
      (1, 'x', 'y', 'z2', 'k'),
      (1, 'x', 'y', 'z3', 'k'),
      (0, 'x', 'y', 'z4', 'k'),
      (1, '6', '7', '81', '9'),
      (0, '6', '7', '82', '9'),
    ],
    ['c1', 'c2', 'c3', 'c4', 'c5']
).repartition(5) 

df_in.show()

Provides on my run (may very each run)

+---+---+---+---+---+
| c1| c2| c3| c4| c5|
+---+---+---+---+---+
|  1|  x|  y| z2|  k|
|  0|  x|  y| z4|  k|
|  1|  a|  b| c1|  d|
|  0|  1|  2|  3|  4|
|  0|  6|  7| 82|  9|
|  0|  a|  b| c2|  d|
|  0|  e|  f|  g|  h|
|  1|  6|  7| 81|  9|
|  1|  x|  y| z3|  k|
|  1|  x|  y| z1|  k|
+---+---+---+---+---+
anky
  • 74,114
  • 11
  • 41
  • 70
ggordon
  • 9,790
  • 2
  • 14
  • 27
  • i wonder, really need df_in.orderBy(orderByColumns) at first, i don't know what it's mean – qxk71551 Apr 17 '21 at 18:45
  • some case no pass, your solution is general, but it doesn't keep sequence conditional, for example i updated on question, please review that above – qxk71551 Apr 17 '21 at 18:48
  • i remove c4 in list orderBy col, and it's work, but i have to keep order in c4 in raw data – qxk71551 Apr 17 '21 at 18:57