0

I want to read a CSV file (less than 50MB) from Spark and perform some join&filter operations. The rows in the CSV file are ordered by some criteria (Score in this case). I want to save the results in single CSV file which the original rows order are kept.

Input CSV file:

Id, Score
5, 100
3, 99
6, 98
7, 95

After some join&filter operations:

val data = spark.read.option("header", "true").csv("s3://some-bucket/some-dir/123.csv")
val results = data
  .dropDuplicates($"some_col")
  .filter(x => ...)
  .join(anotherDataset, Seq("some_col"), "left_anti")

results.repartition(1).write.option("header", "true").csv("...")

Expected outputs:

Id, Score
5, 100
6, 98

(ID 3 and 7 are filtered out)

As Spark might loads the data into multiple partitions, how can I keep the original order?

coderz
  • 4,847
  • 11
  • 47
  • 70
  • My understanding is if the input CSV file is small enough to be loaded into one partition (default partition size is 64MB), all the operations are done within single partition and the order is kept. – coderz Oct 05 '20 at 03:05
  • "all the operations are done within single partition and the order is kept" not all operators are order preserving. in particular most joins, group bys, window functions, and sorts obv do not preserve order even if there is no shuffle. – Andrew Long Oct 05 '20 at 09:01
  • `dropDuplicates `, `join`, `repartition` they are all `wide transformation`. Since they need to shuffle data in several Spark nodes, the initial order *will not be retained*. To ensure your initial order after a `wide transformation` you will need to reorder your dataset, by score in your case. Here is a related [discussion](https://stackoverflow.com/questions/29284095/which-operations-preserve-rdd-order). – abiratsis Oct 05 '20 at 10:54

1 Answers1

1

What you need to todo is append a column with monotonically_increasing_id() before you do any operations that shift the order of records such as group-bys, joins, distinct, etc. This function can help you recreate the order of records within a partition.

"The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition"

val data = spark.read.option("header", "true").csv("s3://some-bucket/some-dir/123.csv")
val results = data
  .withColumn("rowId",monotonically_increasing_id())
  .dropDuplicates($"some_col"). // this might need to be replaced with a window function.
  .filter(x => ...)
  .join(anotherDataset, Seq("some_col"), "left_anti")

results.repartition(1)
.orderBy("rowId")
.write.option("header", "true").csv("...")

note for some reason spark sql doesn't include easy built in functions to get the spark partition id or spark partition row number but monotonically_increasing_id fortunately does well enough.

Andrew Long
  • 863
  • 4
  • 9