1

I had one DataFrame as A, like:

+---+---+---+---+----------+
|key| c1| c2| c3|      date|
+---+---+---+---+----------+
| k1| -1|  0| -1|2015-04-28|
| k1|  1| -1|  1|2015-07-28|
| k1|  1|  1|  1|2015-10-28|
| k2| -1|  0|  1|2015-04-28|
| k2| -1|  1| -1|2015-07-28|
| k2|  1| -1|  0|2015-10-28|
+---+---+---+---+----------+

those codes to create A:

data = [('k1', '-1', '0', '-1','2015-04-28'),
    ('k1', '1', '-1', '1', '2015-07-28'),
    ('k1', '1', '1', '1', '2015-10-28'),
    ('k2', '-1', '0', '1', '2015-04-28'),
    ('k2', '-1', '1', '-1', '2015-07-28'),
    ('k2', '1', '-1', '0', '2015-10-28')]
A = spark.createDataFrame(data, ['key', 'c1', 'c2','c3','date'])
A = A.withColumn('date',A.date.cast('date'))

I want to get max of date for some columns from c1 to c5 on which the values is equal to 1 or -1. The expected result of B:

+---+----------+----------+----------+----------+----------+----------+
|key|      c1_1|      c2_1|      c3_1|     c1_-1|     c2_-1|     c3_-1|
+---+----------+----------+----------+----------+----------+----------+
| k1|2015-10-28|2015-10-28|2015-10-28|2015-04-28|2015-07-28|2015-04-28|
| k2|2015-10-28|2015-07-28|2015-04-28|2015-07-28|2015-10-28|2015-07-28|
+---+----------+----------+----------+----------+----------+----------+

My preview solution is to separately calculate columns from c1-c2 by using pivot operation, then join those DateFrames created newly. But, in my situation, the columns is too many, I met the matter of performance. So, I hope get the other solution to substitute for join of DataFrame.

zero323
  • 322,348
  • 103
  • 959
  • 935
Ivan Lee
  • 3,420
  • 4
  • 30
  • 45

2 Answers2

3

First melt the DataFrame:

value_vars = ["c1", "c2", "c3"]
a_long = melt(A, id_vars=["key", "date"], value_vars=value_vars)

Drop the zeros:

without_zeros = a_long.where(col("value") != 0)

Merge variable an value:

from pyspark.sql.functions import concat_ws

combined = without_zeros.withColumn(
    "cs", concat_ws("_", col("variable"), col("value")))

Finally pivot:

from pyspark.sql.functions import max

(combined
    .groupBy("key")
    .pivot("cs", ["{}_{}".format(c, i) for c in value_vars for i in [-1, 1]])
    .agg(max("date")))

The result is:

+---+----------+----------+----------+----------+----------+----------+
|key|     c1_-1|      c1_1|     c2_-1|      c2_1|     c3_-1|      c3_1|
+---+----------+----------+----------+----------+----------+----------+
| k2|2015-07-28|2015-10-28|2015-10-28|2015-07-28|2015-07-28|2015-04-28|
| k1|2015-04-28|2015-10-28|2015-07-28|2015-10-28|2015-04-28|2015-10-28|
+---+----------+----------+----------+----------+----------+----------+
Community
  • 1
  • 1
zero323
  • 322,348
  • 103
  • 959
  • 935
0
def function1(dd:pd.DataFrame):
    dd=dd.set_index("date")[['c1','c2','c3']].astype('int').sort_index(ascending=False)
    dd1=dd.idxmax().to_frame().T.add_suffix("_1")
    dd2=dd.idxmin().to_frame().T.add_suffix("_-1")
    return pd.concat([dd1,dd2],axis=1)

A.pandas_api().groupby('key').apply(function1).reset_index(level=0)

The result is:

+---+----------+----------+----------+----------+----------+----------+
|key|     c1_-1|      c1_1|     c2_-1|      c2_1|     c3_-1|      c3_1|
+---+----------+----------+----------+----------+----------+----------+
| k2|2015-07-28|2015-10-28|2015-10-28|2015-07-28|2015-07-28|2015-04-28|
| k1|2015-04-28|2015-10-28|2015-07-28|2015-10-28|2015-04-28|2015-10-28|
+---+----------+----------+----------+----------+----------+----------+
G.G
  • 639
  • 1
  • 5