0

I have a Hive query that returns data as such:

Date,Name,Score1,Score2,Avg_Score
1/1/2018,A,10,20,15
1/1/2018,B,20,20,20
1/1/2018,C,15,10,12.5
1/1/2018,D,11,12,11.5
1/1/2018,E,21,29,25
1/1/2018,F,10,21,15.5

I use hive_context.sql(my_query).rdd to get this into an RDD. My ultimate aim is to get this into a JSON format with descending rank based on Avg_score as follows:

Scores=
[
    {
        "Date": '1/1/2018',
        "Name": 'A',
        "Avg_Score": 15,
        "Rank":4
    },
    {
        "Date": '1/1/2018',
        "Name": 'B',
        "Avg_Score": 20,
        "Rank":2
    }
]

As a first step of getting ranks, I tried implementing this approach but I keep running into errors like AttributeError: 'RDD' object has no attribute 'withColumn'

How would I get this done?

Wilmerton
  • 1,448
  • 1
  • 12
  • 31
Craig
  • 1,929
  • 5
  • 30
  • 51
  • Have you tried not converting to `rdd` and operate on a DataFrame (`withColumn` is a DataFrame function) – pault Aug 09 '18 at 20:19
  • @pault Not sure I understand your question - I already have the RDD – Craig Aug 09 '18 at 20:39
  • 1
    Try `df = hive_context.sql(my_query)` (remove the `.rdd`) – pault Aug 09 '18 at 20:40
  • then you have this to convert your Dataframe to and RDD of dict https://stackoverflow.com/questions/49432167/convert-rows-into-dictionary-in-pyspark – Wilmerton Aug 13 '18 at 09:22

1 Answers1

1

This is because your are working at the RDD level. You have to stay with a Dataset (or Dataframe) if you want to the use the Dataframe API. As mentionned in the comments to you question, you can remove the .rdd conversion and use asDict to get the final result.

df = sc.parallelize([
  ("1/1/2018","A",10,20,15.0),
  ("1/1/2018","B",20,20,20.0),
  ("1/1/2018","C",15,10,12.5),
  ("1/1/2018","D",11,12,11.5),
  ("1/1/2018","E",21,29,25.0),
  ("1/1/2018","F",10,21,15.5)]).toDF(["Date","Name","Score1","Score2","Avg_Score"])

from pyspark.sql import Window
import pyspark.sql.functions as psf

w = Window.orderBy(psf.desc("Avg_Score"))

rddDict = (df
  .withColumn("rank",psf.dense_rank().over(w))
  .drop("Score1","Score2")
  .rdd
  .map(lambda row: row.asDict()))

with the result

>>> rddDict.take(1)
[{'Date': u'1/1/2018', 'Avg_Score': 25, 'Name': u'E', 'rank': 1}]

Note however the warning with using a Window function with no partitioning:

18/08/13 11:44:32 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
Wilmerton
  • 1,448
  • 1
  • 12
  • 31
  • Even if doesn't address specifically the order-conserving issue, this thread is an interesting and (partially) relevant read https://stackoverflow.com/questions/30304810/dataframe-ified-zipwithindex – Wilmerton Aug 13 '18 at 10:11