0

have a dataframe (columns c to e have up to 15 variations)

cola, colb, colc_1, cold_1, cole_1, colc_2, cold_2, cole_2...
1,     2,     3,     4,      5,      6,      7,      8

want dataframe

cola, colb, new_col colc, cold, cole, 
1,     2,     _1,     3,    4,     5
1,     2,     _2,     6,    7,     8

Looking to transpose colc to cole and use the suffix of those columns (_1, _2..._15) as the value of the transposed field (new_col)

I am able to do this in Pandas using melt and pivot but the dataframe in this example is too large to be converted to a Pandas df and needs to be done in pyspark or aws glue

babz
  • 469
  • 6
  • 16

1 Answers1

1

You might try select() and union(). below code first lists the basic logic and then use reduce() function to eliminate all intermediate dataframes:

from pyspark.sql import functions as F
from functools import reduce

df = spark.createDataFrame([
        (1,2,3,4,5,6,7,8)
      , (11,12,13,14,15,16,17,18)
      , (21,22,23,24,25,26,27,28)
    ],
    [   'cola', 'colb'
      , 'colc_1', 'cold_1', 'cole_1'
      , 'colc_2', 'cold_2', 'cole_2'
    ])

# create df1 with all columns for new_col = '_1'
df1 = df.select('cola', 'colb', F.lit('_1'), 'colc_1', 'cold_1', 'cole_1')

df1.show()
#+----+----+---+------+------+------+
#|cola|colb| _1|colc_1|cold_1|cole_1|
#+----+----+---+------+------+------+
#|   1|   2| _1|     3|     4|     5|
#|  11|  12| _1|    13|    14|    15|
#|  21|  22| _1|    23|    24|    25|
#+----+----+---+------+------+------+

# do the similar for '_2'
df2 = df.select('cola', 'colb', F.lit('_2'), *["col{}_2".format(i) for i in list("cde")])
#+----+----+---+------+------+------+
#|cola|colb| _2|colc_2|cold_2|cole_2|
#+----+----+---+------+------+------+
#|   1|   2| _2|     6|     7|     8|
#|  11|  12| _2|    16|    17|    18|
#|  21|  22| _2|    26|    27|    28|
#+----+----+---+------+------+------+

# then union these two dataframe and adjust the final column names
df_new = df1.union(df2).toDF('cola', 'colb', 'new_col', 'colc', 'cold', 'cole')
df_new.show()
#+----+----+-------+----+----+----+
#|cola|colb|new_col|colc|cold|cole|
#+----+----+-------+----+----+----+
#|   1|   2|     _1|   3|   4|   5|
#|  11|  12|     _1|  13|  14|  15|
#|  21|  22|     _1|  23|  24|  25|
#|   1|   2|     _2|   6|   7|   8|
#|  11|  12|     _2|  16|  17|  18|
#|  21|  22|     _2|  26|  27|  28|
#+----+----+-------+----+----+----+

Next we can use reduce() function to handle all groups of columns without the above temporary df1, df2 etc:

# setup the list of columns to be normalized
normalize_cols = ["col{}".format(c) for c in list("cde")]
# ["colc", "cold", "cole"]    

# change N to 16 to cover new_col from '_1' to '_15'
N = 3

# use reduce to handle all groups
df_new = reduce(
    lambda d1,d2: d1.union(d2)
  , [ df.select('cola', 'colb', F.lit('_{}'.format(i)), *["{}_{}".format(c,i) for c in normalize_cols]) for i in range(1,N) ]
).toDF('cola', 'colb', 'new_col', *normalize_cols)

Another way is using F.array() and F.explode() (use reduce() for all _N):

df.withColumn('d1', F.array(F.lit('_1'), *['col{}_1'.format(c) for c in list("cde")])) \
  .withColumn('d2', F.array(F.lit('_2'), *['col{}_2'.format(c) for c in list("cde")])) \
  .withColumn('h', F.array('d1', 'd2')) \
  .withColumn('h1', F.explode('h')) \
  .select('cola', 'colb', *[ F.col('h1')[i] for i in range(4)]) \
  .toDF('cola', 'colb', 'new_col', 'colc', 'cold', 'cole') \
  .show()

Update Per comment:

To denormalize the dataframe, I am using F.array() and then F.collect_list to group the columns into list of arrays and then refer the values from the groupby() result:

Using a Window function to set the order of the elements in collect_list:reference link

N = 3
normalize_cols = ["col{}".format(c) for c in list("cde")]

# win spec so that element in collect_list are sorted based on 'new_col'
win = Window.partitionBy('cola', 'colb').orderBy('new_col')

df_new.withColumn('cols', F.array(normalize_cols)) \
      .withColumn('clist', F.collect_list('cols').over(win)) \
      .groupby('cola', 'colb').agg(F.last('clist').alias('clist1')) \
      .select('cola', 'colb', *[ F.col('clist1')[i].alias('c{}'.format(i)) for i in range(N-1)]) \
      .select('cola', 'colb', *[ F.col('c{}'.format(i))[j].alias('{}_{}'.format(normalize_cols[j],i+1)) for i in range(N-1) for j in range(len(normalize_cols)) ]) \
      .show()    

# +----+----+------+------+------+------+------+------+                           
# |cola|colb|colc_1|cold_1|cole_1|colc_2|cold_2|cole_2|
# +----+----+------+------+------+------+------+------+
# |  11|  12|    13|    14|    15|    16|    17|    18|
# |  21|  22|    23|    24|    25|    26|    27|    28|
# |   1|   2|     3|     4|     5|     6|     7|     8|
# +----+----+------+------+------+------+------+------+

Some Explanations:

  • F.last() in groupby.agg() returns the full collect_list from the Window function under the same partitionBy(groupby)
  • the 1st select() convert collect_list() into c0, c1
  • the 2nd select() convert c0 to colc_1, cold_1, cole_1 and c1 to colc_2, cold_2, cole_2
jxc
  • 13,553
  • 4
  • 16
  • 34
  • @jxc- this was amazing, thank you. Would you be able to assist in reversing this logic? (i.e. start with the 'want' dataframe and change it back to the 'have' dataframe) – babz Apr 30 '19 at 14:05
  • 1
    @babz, added a section `Update Per comment` to reflect my options to your question. – jxc Apr 30 '19 at 18:13