0

In my application, I'm using parallelize method to save an Array into file.

code as follows:

 val sourceRDD = sc.textFile(inputPath + "/source")

 val destinationRDD = sc.textFile(inputPath + "/destination")

val source_primary_key = sourceRDD.map(rec => (rec.split(",")(0).toInt, rec))
val destination_primary_key = destinationRDD.map(rec => (rec.split(",")(0).toInt, rec))

val extra_in_source = source_primary_key.subtractByKey(destination_primary_key)
val extra_in_destination = destination_primary_key.subtractByKey(source_primary_key)

val source_subtract = source_primary_key.subtract(destination_primary_key)

val Destination_subtract = destination_primary_key.subtract(source_primary_key)

val exact_bestmatch_src = source_subtract.subtractByKey(extra_in_source).sortByKey(true).map(rec => (rec._2))
val exact_bestmatch_Dest = Destination_subtract.subtractByKey(extra_in_destination).sortByKey(true).map(rec => (rec._2))

val exact_bestmatch_src_p = exact_bestmatch_src.map(rec => (rec.split(",")(0).toInt))

val primary_key_distinct = exact_bestmatch_src_p.distinct.toArray()

for (i <- primary_key_distinct) {

  var dummyVar: String = ""
  val src = exact_bestmatch_src.filter(line => line.split(",")(0).toInt.equals(i))
  var dest = exact_bestmatch_Dest.filter(line => line.split(",")(0).toInt.equals(i)).toArray

  for (print1 <- src) {

    var sourceArr: Array[String] = print1.split(",")
    var exactbestMatchCounter: Int = 0
    var index: Array[Int] = new Array[Int](1)

    println(print1 + "source")

    for (print2 <- dest) {

      var bestMatchCounter = 0
      var i: Int = 0

      println(print1 + "source + destination" + print2)

      for (i <- 0 until sourceArr.length) {
        if (print1.split(",")(i).equals(print2.split(",")(i))) {
          bestMatchCounter += 1
        }
      }
      if (exactbestMatchCounter < bestMatchCounter) {
        exactbestMatchCounter = bestMatchCounter
        dummyVar = print2
        index +:= exactbestMatchCounter //9,8,9      
      }
    }

    var z = index.zipWithIndex.maxBy(_._1)._2

    if (exactbestMatchCounter >= 0) {
      var samparr: Array[String] = new Array[String](4)
      samparr +:= print1 + "  BEST_MATCH  " + dummyVar     
      var deletedest: Array[String] = new Array[String](1)
      deletedest = dest.take(z) ++ dest.drop(1)
      dest = deletedest
val myFile = sc.parallelize((samparr)).saveAsTextFile(outputPath)

I have used parallelize method and I even tried with below method to save it as a file

val myFile = sc.textFile(samparr.toString())
val finalRdd = myFile
finalRdd.coalesce(1).saveAsTextFile(outputPath)

but its keep throwing the error :

Exception in thread "main" org.apache.spark.SparkException: Task not serializable

Vickyster
  • 163
  • 3
  • 5
  • 18

2 Answers2

0

You can't treat an RDD like a local collection. All operations against it happen over a distributed cluster. To work, all functions you run in that rdd must be serializable.

The line

for (print1 <- src) {

Here you are iterating over the RDD src, everything inside the loop must be serialize, as it will be run on the executors.

Inside however, you try to run sc.parallelize( while still inside that loop. SparkContext is not serializable. Working with rdds and sparkcontext are things you do on the driver, and cannot do within an RDD operation.

I'm entirely sure what you are trying to accomplish, but it looks like some sort of hand-coded join operation with the source and destination. You can't work with loops in rdds like you can with local collections. Make use of the apis map, join, groupby, and the like to create your final rdd then save that.

If you absolutely feel you must use a foreach loop over the rdd like this, then you can't use sc.parallelize().saveAsTextFile() Instead open an outputstream using the hadoop file api and write your array to the file manually.

puhlen
  • 8,400
  • 1
  • 16
  • 31
0

Finally this piece of code helps me to save an array to file.

 new PrintWriter(outputPath) { write(array.mkString(" ")); close }
Vickyster
  • 163
  • 3
  • 5
  • 18