0

I am trying to use pivot in Apache Spark.

My data is:

+--------------------+---------+
|           timestamp|  user|
+--------------------+---------+
|2017-12-19T00:41:...|User_1|
|2017-12-19T00:01:...|User_2|
|2017-12-19T00:01:...|User_1|
|2017-12-19T00:01:...|User_1|
|2017-12-19T00:01:...|User_2|
+--------------------+---------+

I want to pivot on the user column.

But I keep getting the error:

'DataFrame' object has no attribute 'pivot'
Traceback (most recent call last):
  File "/usr/hdp/current/spark2-client/python/pyspark/sql/dataframe.py", line 1020, in __getattr__
    "'%s' object has no attribute '%s'" % (self.__class__.__name__, name))
AttributeError: 'DataFrame' object has no attribute 'pivot'

No Matter how I use it.

i.e. df.groupBy('A').pivot('B') or df.pivot('B')

My actual query is:

# The Pivot operation will give timestamp vs Users data
pivot_pf = tf.groupBy(window(tf["timestamp"], "2 minutes"), 'user').count().select('window.start', 'user', 'count').pivot("user").sum("count")

Any help is greatly appreciated.

Thanks.

david nadal
  • 279
  • 4
  • 16
  • df.groupBy('A').pivot('B') should follow by some aggregation. – koiralo May 22 '18 at 10:29
  • @ShankarKoirala thanks, I have updated the question with the real query..I am actually using sum after pivot – david nadal May 22 '18 at 10:31
  • @ShankarKoirala another update on the data format..plz take a look – david nadal May 22 '18 at 10:34
  • don't group by user if you pivot by this field. – vvg May 22 '18 at 10:36
  • @Rumoku... if I dont group by user...i will just get timestamp vs count dataframe ..user info is lost..right? How can then pivot be on the user data – david nadal May 22 '18 at 10:40
  • you need each user to become separate column? that's the way to go! – vvg May 22 '18 at 10:45
  • @Rumoku i get the error cannot resolve '`user`' given input columns: [window, count] ... i was expecting this error..but how are you saying it should work? could you give the query? Thanks. – david nadal May 22 '18 at 10:50
  • Can't understand why its a downvote..either the person didn't read the question ... it is not about using aggregate after pivot.. – david nadal May 22 '18 at 10:53

2 Answers2

1
import pyspark.sql.functions as func
from datetime import datetime

df = spark_session.createDataFrame(
    [[datetime.strptime("2012-01-01 00:00:00", '%Y-%m-%d %H:%M:%S'), 'one'],
     [datetime.strptime("2012-01-01 00:01:00", '%Y-%m-%d %H:%M:%S'), 'two'],
     [datetime.strptime("2012-01-01 00:02:00", '%Y-%m-%d %H:%M:%S'), 'three'],
     [datetime.strptime("2012-01-01 00:03:00", '%Y-%m-%d %H:%M:%S'), 'one'],
     [datetime.strptime("2012-01-01 00:04:00", '%Y-%m-%d %H:%M:%S'), 'two']],
    'dd: timestamp, user: string')


df.groupBy(func.window(df["dd"], "2 minutes")).pivot('user').agg({'dd': 'count'}).show()

Expected output:

+--------------------+----+-----+----+
|              window| one|three| two|
+--------------------+----+-----+----+
|[2012-01-01 00:00...|   1| null|   1|
|[2012-01-01 00:04...|null| null|   1|
|[2012-01-01 00:02...|   1|    1|null|
+--------------------+----+-----+----+
vvg
  • 6,325
  • 19
  • 36
0

Pivot works well as mentioned below. But it returns grouped data. If we use some aggregation on grouped data, it will result into dataframe.

val d1 = Array(("a", "10"), ("b", "20"), ("c", "30"),("a","56"),("c","29"))
val rdd1= sc.parallelize(d1)
val df1 = rdd1.toDF("key","val")
df1.groupBy("key").pivot("val")
Mugdha
  • 112
  • 9