6

I'm using spark sql to run a query over my dataset. The result of the query is pretty small but still partitioned.

I would like to coalesce the resulting DataFrame and order the rows by a column. I tried

DataFrame result = sparkSQLContext.sql("my sql").coalesce(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

I also tried

DataFrame result = sparkSQLContext.sql("my sql").repartition(1).orderBy("col1")
result.toJSON().saveAsTextFile("output")

the output file is ordered in chunks (i.e. the partitions are ordered, but the data frame is not ordered as a whole). For example, instead of

1, value
2, value
4, value
4, value
5, value
5, value
...

I get

2, value
4, value
5, value
-----------> partition boundary
1, value
4, value
5, value
  1. What is the correct way to get an absolute ordering of my query result?
  2. Why isn't the data frame being coalesced into a single partition?
zero323
  • 322,348
  • 103
  • 959
  • 935
fo_x86
  • 2,583
  • 1
  • 30
  • 41
  • As you know repartition is a lazy process and will not be executed until the next operation. I suggest you to insert a "count" calculation between the ordering and the repartitioning so you would make sure that the repartitioning happened before the ordering and not together. Let me know the result. – Abdulrahman Jul 31 '15 at 09:02
  • I've tried adding `count` as `result = result.coalesce(1); result.count(); result.orderBy("col1")` with no luck... – fo_x86 Aug 04 '15 at 00:34
  • Check out this post: http://stackoverflow.com/questions/24371259/how-to-make-saveastextfile-not-split-output-into-multiple-file – IrishDog Dec 29 '15 at 14:44
  • @fo_x86: you should use coalesce or repartition after converting the DF to JSON then save as text file. That should solve your problem. – Shankar Jun 01 '17 at 02:47

2 Answers2

3

I want to mention couple of things here . 1- the source code shows that the orderBy statement internally calls the sorting api with global ordering set to true .So the lack of ordering at the level of the output suggests that the ordering was lost while writing into the target. My point is that a call to orderBy always requires global order.

2- Using a drastic coalesce , as in forcing a single partition in your case , can be really dangerous. I would recommend you do not do that. The source code suggests that calling coalesce(1) can potentially cause upstream transformations to use a single partition . This would be brutal performance wise.

3- You seem to expect the orderBy statement to be executed with a single partition. I do not think that i agree with that statement. That would make Spark a really silly distributed framework.

Community please let me know if you agree or disagree with statements.

how are you collecting data from the output anyway?

maybe the output actually contains sorted data , but the transformations /actions that you performed in order to read from the output is responsible for the order lost.

JavaPlanet
  • 83
  • 9
3

The orderBy will produce new partitions after your coalesce. To have a single output partition, reorder the operations...

DataFrame result = spark.sql("my sql").orderBy("col1").coalesce(1)
result.write.json("results.json")

As @JavaPlanet mentioned, for really big data you don't want to coalesce into a single partition. It will drastically reduce your level of parallelism.

Doug Bateman
  • 81
  • 1
  • 1