I have a bug where the output of my spark job (submitted to Amazon EMR) indicates chunks of missing input data.
The processing of the data happens in an external process on the executors. This process requires all of the input for a given key to be present and cannot be concatenated after the fact.
When I inspect the logging, I can see that RDD.groupByKey
has failed to group all values for a key under that key onto the same executor.
case class TileKey(z: Short, x: Int, y: Int)
case class ProcessResult(code: Int, message: String)
def exec(tk: TileKey, xs: Seq[String]): ProcessResult = {
// a function that uses sys.process to call the executable on the executor,
// passing `xs` to it and returning some kind of execution result
...
ProcessResult(code = 0, message = s"Processing ${xs.size} entries for $tk")
}
val input: RDD[(TileKey, String)] = ???
input.groupByKey().map {
case (k, xs) => exec(k, xs.toSeq)
}.map(_.message).collect().foreach(println)
The logging shows
Processing 15441 entries for TileKey(12,702,1635)
...
Processing 1 entries for TileKey(12,702,1635)
indicating that there were duplicate key entries after groupByKey
. The second lot of processing overwrites the output of the first.
Is it possible that any given executor will not see the entire value associated with a given key after groupByKey
?