0

I have a Spark RDD where each element is a tuple in the form (key, input). I would like to use the pipe method to pass the inputs to an external executable and generate a new RDD of the form (key, output). I need the keys for correlation later.

Here is an example using the spark-shell:

val data = sc.parallelize(
  Seq(
    ("file1", "one"),
    ("file2", "two two"),
    ("file3", "three three three")))

// Incorrectly processes the data (calls toString() on each tuple)
data.pipe("wc")

// Loses the keys, generates extraneous results
data.map( elem => elem._2 ).pipe("wc")

Thanks in advance.

2 Answers2

2

The solution with map is not correct as map is not guarantee to preserve partitioning so using zip after will fail. You need to use mapValues to preserve the partition of the initial RDD.

data.zip( 
  data.mapValues{ _.toString }.pipe("my_executable")
).map { case ((key, input), output) => 
  (key, output)
}
geoalgo
  • 678
  • 3
  • 11
  • Note that if data rdd has no partitioner, this will fail too (since mapValues will hold the partitioner only if it's available). Of course that repartitioning prior running this snippet will solve the issue. – ilcord Aug 10 '17 at 14:04
0

Considering you cannot pass label in/out of executable, this might work:

rdd
  .map(x => x._1)
  .zip(rdd
          .map(x => x._2)
          .pipe("my executable"))

Please, be aware, that this can be fragile, and will definitely break if your executable not produces exactly single line on each input record.

Ivan Balashov
  • 1,897
  • 1
  • 23
  • 33
  • This generates a SparkException: `Can only zip RDDs with same number of elements in each partition`. The problem seems to be that empty partitions are created, which produce an empty result when piped to the executable. Similar to this question: http://stackoverflow.com/q/33277567 – jollybugbear Jun 29 '16 at 21:51
  • @jollybugbear Does executable produce exactly one line on each input? – Ivan Balashov Jun 30 '16 at 05:53
  • @Ivan You can produce several lines per input. It will then come back as a string "List(line1,line2, ...)". – geoalgo Jun 30 '16 at 09:36