1

Without going too deep into details, I am wondering whether anyone came across similar strange behavior.

I am running all of this in a scala spark-shell, spark 1.4.1.

I have one spark df called "data". Basically these are parquet files which are read into spark using the sql.context. I do a couple of transformations with this data set, including filters, groupBy's, sorts, counts, ... nothing too fancy, and all of which deterministic, no randomness whatsoever. By this means I am creating some derived df's subset_1 and subset_2. At the very end I am running the following type of calculation:

data.join(subset_1,"key_A").withColumnRenamed("count","count_1").join(subset_2,"key_A").filter("Feature LIKE 'inactive'").groupBy($"key_A",$"key_B").count.withColumnRenamed("count","count_2").groupBy($"key_A").count.withColumnRenamed("count","count_3").groupBy($"count_3").count.collect()

This computation runs "fine" from a syntax point of view. However, in different runs of this query, I get different results. For example:

res82: Array[org.apache.spark.sql.Row] = Array([31,3], [32,2], [34,1], [35,1], [38,1], [42,1], [44,1], [52,1], [61,2], [81,1], [1,4933], [2,2361], [3,924], [4,441], [5,220], [6,130], [7,80], [8,59], [9,36], [10,24], [11,13], [12,12], [13,7], [14,7], [15,11], [16,6], [17,4], [18,6], [19,3], [20,5], [21,6], [22,3], [24,1], [25,1], [26,2], [27,2], [28,1], [29,1], [30,3])

and

res81: Array[org.apache.spark.sql.Row] = Array([32,3], [35,3], [43,1], [46,2], [52,1], [122,1], [145,1], [165,1], [1,3515], [2,1887], [3,836], [4,381], [5,238], [6,136], [7,84], [8,51], [9,39], [10,28], [11,28], [12,13], [13,7], [14,13], [15,8], [16,10], [17,8], [18,6], [19,4], [20,2], [21,4], [22,3], [23,4], [24,1], [25,2], [26,1], [28,3], [29,1], [30,2])

Again, same data, same code, no included randomness in what I do, still: random results.

Any thoughts are highly appreciated.

Blaubaer
  • 654
  • 1
  • 5
  • 15

1 Answers1

1

OK, the "problem" I encounter is related to the following:

Spark sort by key and then group by to get ordered iterable?

Basically, one has to be very careful with combinations of sort and groupBy.

Example: Let's say you have a dataframe df with columns person, status and date. Let's say you'd like to get the latest status of a person. One might think of doing something like:

df.sort($"date").groupBy($"person").agg(sql.functions.last($"status"))

Unfortunately, if you .collect the result, and you try this many times, you'll realize that the outcomes might be different (I guess, they will only be identical, if the data underlying df consist of exactly one partition). This is because .sort is done locally on the partitions, and the way the partitions are put together during groupBy is by no means guaranteed to maintain the "global order" of $"date" within the groups.

Community
  • 1
  • 1
Blaubaer
  • 654
  • 1
  • 5
  • 15