1

Using Spark 2.1, I have a function that takes a DataFrame and checks to see if all records are on a given Database (Aerospike in this case).

It looks pretty much like this:

def check(df: DataFrame): Long = {
    val finalResult = df.sparkSession.sparkContext.longAccumulator("finalResult")
    df.rdd.foreachPartition(iter => {
        val success = //if record is on the database: 1 else: 0 
        //if success = 0, send Slack message with missing record
        finalResult.add(success)
       }
      df.count - finalResult.value
    }

So, the number of Slack messages should match the number returned by the function (total number of missing records), but quite often this is not the case - I get, for example, one Slack message but check = 2. Rerunning it provides check = 1.

Any ideas what's happening?

shakedzy
  • 2,853
  • 5
  • 32
  • 62

1 Answers1

0

Spark can run a method many times for the same data on different workers, meaning that you are counting each success * the number of times that data was processed on any worker. Hence you can get different results in the accumulator for different passes over the same data.

You cannot use Accumulators to get an exact count in this case. Sorry. :(

sil
  • 433
  • 8
  • 20
  • then why am I only getting one Slack message? if it is processed twice, then I should have two messages – shakedzy May 26 '17 at 11:18
  • Hmm, sorry, not sure then. While I haven't used dataframes too much, iter should be all the data on one partition, and not just 1 record. Are you sure that you success can only be 1 or 0? Beyond that, I have nothing – sil May 26 '17 at 11:42
  • 2
    I don't think this is right. Because he is running this in a foreach which is an action, spark guarantees the accumulator will be updated exactly once so the valuer should be correct. Accumulators are only reported on stage completion, so when running in an action, even if it needs to be rerun the partial result will not affect the final value. – puhlen May 26 '17 at 13:44