1

I have a Pyspark Dataframe in the following format:

+------------+---------+
|    date    |  query  |
+------------+---------+
| 2011-08-11 | Query 1 |
| 2011-08-11 | Query 1 |
| 2011-08-11 | Query 2 |
| 2011-08-12 | Query 3 |
| 2011-08-12 | Query 3 |
| 2011-08-13 | Query 1 |
+------------+---------+

And I need to transform it to turn each unique query into a column, grouped by date, and insert the count of each query in the rows of the dataframe. I expect the output to be like this:

+------------+---------+---------+---------+
|    date    | Query 1 | Query 2 | Query 3 |
+------------+---------+---------+---------+
| 2011-08-11 |       2 |       1 |       0 |
| 2011-08-12 |       0 |       0 |       2 |
| 2011-08-13 |       1 |       0 |       0 |
+------------+---------+---------+---------+

I am trying to use this answer as example, but I don't quite understand the code, especially the return statement in the make_row function.

Is there a way to count the queries while transforming the DataFrame? Maybe something like

import pyspark.sql.functions as func

grouped = (df
    .map(lambda row: (row.date, (row.query, func.count(row.query)))) # Just an example. Not sure how to do this.
    .groupByKey())

It is a dataframe with potentially hundreds of thousands of rows and queries, so I prefer the RDD version over the options that use a .collect()

Thank you!

zero323
  • 322,348
  • 103
  • 959
  • 935
Hannon Queiroz
  • 443
  • 4
  • 22

1 Answers1

2

You can use groupBy.pivot with count as the aggregation function:

from pyspark.sql.functions import count
df.groupBy('date').pivot('query').agg(count('query')).na.fill(0).orderBy('date').show()

+--------------------+-------+-------+-------+
|                date|Query 1|Query 2|Query 3|
+--------------------+-------+-------+-------+
|2011-08-11 00:00:...|      2|      1|      0|
|2011-08-12 00:00:...|      0|      0|      2|
|2011-08-13 00:00:...|      1|      0|      0|
+--------------------+-------+-------+-------+
Psidom
  • 209,562
  • 33
  • 339
  • 356
  • I am struggling to perform some operations with the DataFrame in that format. How would I use this command to create this DataFrame, but in a transposed way? (i.e. the headers as date (timestamp) and each query as a new row? I have tried `queries_df = df.groupBy('query').pivot('query_time').agg(count('query')).na.fill(0)`, but I can't get the dates on the "x-axis", as headers. – Hannon Queiroz Jun 07 '17 at 21:52
  • Works for me; If you mean the date is not formatted correctly, you might need to convert the date to string firstly. `df.withColumn("date", date_format("date", "YYYY-MM-dd")).groupBy('query').pivot('date').agg(count('query')).na.fill(0)` and also import `from pyspark.sql.functions import date_format` – Psidom Jun 07 '17 at 22:01
  • Oh, I'm sorry! You're right! Thank you again :) Last follow up question... now I am trying to sum all columns, except the first, and add the result to a new column in the DataFrame. `import pyspark.sql.functions as F` and then `newDF = queries_df.withColumn('my_sum', F.sum(queries_df[i] for i in queries_df.columns[1:])).show()` But this gives me `TypeError: Column is not iterable`. Any ideas? – Hannon Queiroz Jun 07 '17 at 22:52
  • I have asked a more detailed question in [post](https://stackoverflow.com/questions/44425092/pyspark-create-new-column-from-operations-of-dataframe-columns-gives-error-co) – Hannon Queiroz Jun 08 '17 at 01:14