1

Have been trying to push a particular row in a Spark Dataframe to the end of the Dataframe. This is what I have tried so far.

Input Dataframe:

+-------------+-------+------------+
|expected_date|count  |Downstream  |
+-------------+-------+------------+
|2018-08-26   |1      |abc         |
|2018-08-26   |6      |Grand Total |
|2018-08-26   |3      |xyy         |
|2018-08-26   |2      |xxx         |
+-------------+-------+------------+

Code:

    df.withColumn("Downstream_Hierarchy", when(col("Downstream") === "Grand Total", 2)
    .otherwise(1))
    .orderBy(col("Downstream_Hierarchy").asc)
    .drop("Downstream_Hierarchy")

Output Dataframe:

+-------------+-------+------------+
|expected_date|count  |Downstream  |
+-------------+-------+------------+
|2018-08-26   |1      |abc         |
|2018-08-26   |3      |xyy         |
|2018-08-26   |2      |xxx         |
|2018-08-26   |6      |Grand Total |
+-------------+-------+------------+

Is there a simpler way to do this?

Dasarathy D R
  • 335
  • 2
  • 7
  • 20
  • what is your end goal here? – Assaf Mendelson Aug 27 '18 at 06:24
  • @Assaf Mendelson : I need to find the total count of some use case and publish in mail. Grand Total has to be at the last row so it makes more sense of the data. Also I have edited the question - pls note. – Dasarathy D R Aug 27 '18 at 06:34
  • Looks simpler than the answer in fact. – thebluephantom Aug 27 '18 at 07:01
  • If you send it by email, I assume the result is relatively small. This means you translate it somehow to an "email" (e.g. by doing collect). Why not do the ordering of the last line there (i.e. outside the dataframe)? – Assaf Mendelson Aug 27 '18 at 07:25
  • @AssafMendelson: Missed a point. Actually I do not directly send an email. Eventually I write it to HDFS. From there, the downstream tends to collect the data in part-00000 file and send it via email. Whatever I commented for ur first end goal question was at a high level. – Dasarathy D R Aug 27 '18 at 07:28
  • In this case you should first add a coalesce(1) to make sure the data is indeed in a single partition, then you can do the filter idea from Sanket9394 but instead of doing a union, do a manual append to the hdfs file – Assaf Mendelson Aug 27 '18 at 07:34
  • There is not a "last" column in spark since it's distributed. Each time you'll perform an action, you'll have to sort again, which might be unefficient – BlueSheepToken Aug 27 '18 at 08:15
  • If all your data is eventually going to reside in a single partition, `orderBy` will not be as bad of an option as you think. Just `order by total asc` right before writing. Since the the final total will always be the greatest number in the dataframe. – philantrovert Aug 27 '18 at 08:16

3 Answers3

2

Going through your comments, Since the end result is needed in HDFS you can write it as csv to HDFS twice

1st time write dataframe to hdfs without "Grand Total" row. 2nd time write "Grand Total" row alone with save mode as "append".

0

Data Frame except the required row :

val df1 = df.filter(col("Downstream") =!= "Grand Total" )

Data Frame with the required row :

val df2 = df.filter(col("Downstream") === "Grand Total" )

Required DataFrame :

val df_final = df1.union(df2)

Might not be the best solution, but it avoids the expensive OrderBy operation .

Sanket9394
  • 2,031
  • 1
  • 10
  • 15
  • 1
    This does not guarantee that this would be the last row. It would depend on the order of the partitions – Assaf Mendelson Aug 27 '18 at 07:22
  • @AssafMendelson Exactly! I also have the same doubt in my current code. Even though, I have done order by, before I write it to HDFS - I do a df.repartition(1). Having said that, it will be shuffling the data in different partitions of dataframe which causes "Grand Total" row to move to a different position. If yes, then I need to look for an alternative to move my repartition to a different position. – Dasarathy D R Aug 27 '18 at 07:32
  • You can try to do a coalesce(1) BEFORE doing the filters, assuming all aggregation was done beforehand this should give you a single partition, however, I am not sure this would still guarantee the order. – Assaf Mendelson Aug 27 '18 at 07:33
  • I think it should work, please refer : https://stackoverflow.com/a/29978189/7094520 The partitions are just stacked without any shuffle movements. – Sanket9394 Aug 27 '18 at 08:29
0

You can try below straightforward steps.

val lastRowDf = df.filter("Downstream='Grand Total'")
val remainDf = df.filter("Downstream !='Grand Total'")

remainDf.unionAll(lastRowDf).show
Manoj Kumar Dhakad
  • 1,862
  • 1
  • 12
  • 26