1

My Final RDD looks like this

FinalRDD.collect()

Array[(Int, Seq[Iterable[Int]])] = Array((1,List(List(97), List(98), List(99), List(100))), (2,List(List(97, 98), List(97, 99), List(97, 101))), (3,List(List(97, 98, 99),List(99, 102, 103))))

I would like to write this RDD to a text file in the following format

('97'), ('98'), ('100')

('97', '98'), ('97', '99'), List(97, 101)

('97','98', '99'), ('97', '99', '101')

I found many websites suggesting PrintWriter class from java.io as one option to achieve this. Here is the code that I have tried.

val writer = new PrintWriter(new File(outputFName))

def writefunc(chunk : Seq[Iterable[Int]])
{
  var n=chunk
  print("inside write func")
  for(i <- 0 until n.length)
  {
    writer.print("('"+n(i)+"')"+", ")

  }
 }

finalRDD.mapValues(list =>writefunc(list)).collect()

I ended up getting task serializble error shown below

finalRDD.mapValues(list =>writefunc(list)).collect()
org.apache.spark.SparkException: Task not serializable
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:340)
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:330)
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:156)
at org.apache.spark.SparkContext.clean(SparkContext.scala:2294)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:758)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1.apply(PairRDDFunctions.scala:757)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
at org.apache.spark.rdd.PairRDDFunctions.mapValues(PairRDDFunctions.scala:757)
... 50 elided
Caused by: java.io.NotSerializableException: java.io.PrintWriter
Serialization stack:
- object not serializable (class: java.io.PrintWriter, value:   java.io.PrintWriter@b0c0abe)
- field (class: $iw, name: writer, type: class java.io.PrintWriter)
- object (class $iw, $iw@31afbb30)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@672ca5ae)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@528ac6dd)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@b772a0e)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@7b11bb43)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@94c2342)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@2bacf377)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@718e1924)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@6d112a64)
- field (class: $iw, name: $iw, type: class $iw)
- object (class $iw, $iw@5747e8e4)
- field (class: $line411.$read, name: $iw, type: class $iw)
- object (class $line411.$read, $line411.$read@59a0616c)
- field (class: $iw, name: $line411$read, type: class $line411.$read)
- object (class $iw, $iw@a375f8f)
- field (class: $iw, name: $outer, type: class $iw)
- object (class $iw, $iw@4e3978ff)
- field (class: $anonfun$1, name: $outer, type: class $iw)
- object (class $anonfun$1, <function1>)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:337)
... 59 more

I'm still in the phase of learning scala. Could someone suggest me, how to write "Seq[Iterable[Int]]" object to a text file

Eswar
  • 309
  • 1
  • 3
  • 10
  • 1
    I think there is a big potential for confusion between what Spark can do vs plain Scala for your use case. for small RDDs you could collect the result to memory via `finalRDD.collect()` but don't use any operations on RDD after that like `mapValues()` or `map()`. Then your question would be 100% about Scala and not Spark – IgorK Mar 02 '18 at 11:24
  • @IgorK Thanks for your valuable feedback. I will try to adapt this to my code. – Eswar Mar 02 '18 at 11:30
  • 2
    If your intention is to make it work for potentially huge files and fully utilize Spark for saving RDD - it's a whole different story (the simplest method for csv files would be `finalRDD.saveAsTextFile(...)`, but look at how `DataFrame`s are saved too `finalRDD.toDF().write.format(...).save(...)`). In this case, redefining details of serialization format is probably a bad idea - rely on standard means. if you need text - CSV is supported https://github.com/databricks/spark-csv – IgorK Mar 02 '18 at 11:31
  • so it would be helpful to know what are you trying to do - 1) save small-ish result in Scala code in any working way OR 2) utilize Spark in a scalable way (so it can later save data from multiple nodes, in proper data formats etc.) – IgorK Mar 02 '18 at 11:34
  • @IgorK I would say I'm working on small-ish dataset and hence would like to save the final result using Scala code – Eswar Mar 02 '18 at 11:52
  • If the dataset is so small why would you make more complex using spark and distributed mode? – koiralo Mar 02 '18 at 12:01
  • @ShankarKoirala I'm trying to implement an algorithm with a small dataset. One of the goals is to try and achieve minimum overall run time. Moreover, the nature of the algorithms requires processing data by partition. I hope that helps to understand the background – Eswar Mar 02 '18 at 12:11

2 Answers2

2

Neither you need to collect the rdd nor you need PrintWriter apis.

A simple combinations of map and mkString functions should do the trick for you and finally just use saveAsTextFile api to save the rdd to text file.

finalRDD.map(x => x._2.map("("+_.mkString(", ")+")").mkString(", ")).saveAsTextFile("path to output text file")

You should have your text file with following text lines

(97), (98), (99), (100)
(97, 98), (97, 99), (97, 101)
(97, 98, 99), (99, 102, 103)
Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • wouldn't this cause interesting results on non-local Spark though? :) like every node writing its share in their local FS similar to https://stackoverflow.com/a/48712134/44647 ? – IgorK Mar 02 '18 at 17:55
  • That should be fine too as the above code is converting each row to string. And I don't think each row will be mapped by splitting in the cluster. – Ramesh Maharjan Mar 02 '18 at 17:59
  • just made several tests on local and Dataproc with Spark 2.0.2. When RDD has multiple partitions `saveAsTextFile` saves multiple files (fix with `.repartition(1)`) and mangles file name (the path parameter is treated as a directory and it creates files with names similar to `part-00000` with actual data). Both issues are mentioned in linked SO question ^. This happens even in local Spark. Another issue is that Spark with separate driver and worker nodes will end up creating files in local FS of *worker* nodes (when "file:///..." path is specified). Will link logs in a moment. – IgorK Mar 02 '18 at 21:03
  • updated my answer with logs, but if local Spark + hacky mangled names on the local filesystem are fine - your solution would work too. – IgorK Mar 02 '18 at 21:25
  • @RameshMaharjan Thanks a lot. Your answer helped me a lot to implement the algorithm that I wanted – Eswar Mar 05 '18 at 22:33
1

Since you don't really want to let Spark hand saving data and collected result is expected to be small - just do finalRDD.collect() and apply any of the solutions to print output to file like https://stackoverflow.com/a/4608061/44647 :

// taken from https://stackoverflow.com/a/4608061/44647
def printToFile(fileName: String)(op: java.io.PrintWriter => Unit) {
  val p = new java.io.PrintWriter(fileName)
  try { op(p) } finally { p.close() }
}

val collectedData: Seq[(Int, Seq[Iterable[Int]])] = finalRDD.collect()
val output: Seq[String] = collectedData
  .map(_._2) // use only second part of tuple Seq[Iterable[Int]]
  .map { seq: Seq[Iterable[Int]] =>
     // render inner Iterable[Int] as String in ('1', '2', '3') format
     val inner: Seq[String] = seq.map("(" + _.map(i => s"'$i'").mkString(", ") + ")")
     inner.mkString(", ")
  }

printToFile(outputFileName) { p => output.foreach(p.println) }

If your RDD changes schema - type of collected collection will change and you will have to adjust this code.

Test output from your example collected data (since there is no context to reconstruct RDD):

('97'), ('98'), ('99'), ('100')
('97', '98'), ('97', '99'), ('97', '101')
('97', '98', '99'), ('99', '102', '103')

UPDATE: the other answer https://stackoverflow.com/a/49074625/44647 is correct that you can generate text as RDD[String] and save file(s) somewhere via Spark rdd.saveAsTextFile(...). But there are several potential issues with this approach (also covered in how to make saveAsTextFile NOT split output into multiple file? ):

1) RDD with multiple partitions will generate multiple files (you have to do something like rdd.repartition(1) to at least ensure one file with data is generated)

2) File names are mangled (the path parameter is treated as a directory name) and a bunch of temp junk is generated too. In the example below RDD got split into 4 files part-00000...part-00003 because RDD had 4 partitions - illustrates 1) + 2):

scala> sc.parallelize(collectedData, 4).map(x => x._2.map("("+_.mkString(", ")+")").mkString(", ")).saveAsTextFile("/Users/igork/testdata/test6")

 ls -al  ~/testdata/test6
total 64
drwxr-xr-x  12 igork  staff  408 Mar  2 11:40 .
drwxr-xr-x  10 igork  staff  340 Mar  2 11:40 ..
-rw-r--r--   1 igork  staff    8 Mar  2 11:40 ._SUCCESS.crc
-rw-r--r--   1 igork  staff    8 Mar  2 11:40 .part-00000.crc
-rw-r--r--   1 igork  staff   12 Mar  2 11:40 .part-00001.crc
-rw-r--r--   1 igork  staff   12 Mar  2 11:40 .part-00002.crc
-rw-r--r--   1 igork  staff   12 Mar  2 11:40 .part-00003.crc
-rw-r--r--   1 igork  staff    0 Mar  2 11:40 _SUCCESS
-rw-r--r--   1 igork  staff    0 Mar  2 11:40 part-00000
-rw-r--r--   1 igork  staff   24 Mar  2 11:40 part-00001
-rw-r--r--   1 igork  staff   30 Mar  2 11:40 part-00002
-rw-r--r--   1 igork  staff   29 Mar  2 11:40 part-00003

3) When you run on Spark cluster with multiple nodes (specifically when worker and driver are on different hosts) if given local path it will generate files on local filesystems of worker nodes (and can disperse part-0000* files between different worker nodes). Example run on Google Dataproc with 4 worker hosts is provided below. To overcome this you will want to use a real distributed filesystem like HDFS or blob storage like S3 or GCS and get generated files from there. Otherwise, it's up to you to retrieve multiple files from worker nodes.

Test job had main() with code:

val collectedData: Seq[(Int, Seq[Seq[Int]])] =
  Array((1, List(List(97), List(98), List(99), List(100))),
    (2,List(List(97, 98), List(97, 99), List(97, 101))),
    (3,List(List(97, 98, 99),List(99, 102, 103))))
val rdd = sc.parallelize(collectedData, 4)

val uniqueSuffix = UUID.randomUUID()

// expected to run on Spark executors
rdd.saveAsTextFile(s"file:///tmp/just-testing/$uniqueSuffix/test3")

// expected to run on Spark driver and find NO files
println("Files on driver:")
val driverHostName = InetAddress.getLocalHost.getHostName
Files.walk(Paths.get(s"/tmp/just-testing/$uniqueSuffix/test3"))
  .toArray.map(driverHostName + " : " + _).foreach(println)

// just a *hack* to list files on every executor and get output to the driver
// PLEASE DON'T DO THAT IN PRODUCTION CODE
val outputRDD = rdd.mapPartitions[String] { _ =>
  val hostName = InetAddress.getLocalHost.getHostName
  Seq(Files.walk(Paths.get(s"/tmp/just-testing/$uniqueSuffix/test3"))
    .toArray.map(hostName + " : " + _).mkString("\n")).toIterator
}

// expected to list files as was seen on executor nodes - multiple files should be present
println("Files on executors:")
outputRDD.collect().foreach(println)

Note how files are split between different hosts and driver dp-igork-test-m has no useful files at all because they are on worker nodes dp-igork-test-w-*. The output of test job (changed hostnames a bit for anonymity):

18/03/02 20:54:00 INFO org.spark_project.jetty.util.log: Logging initialized @1950ms

18/03/02 20:54:00 INFO org.spark_project.jetty.server.Server: jetty-9.2.z-SNAPSHOT

18/03/02 20:54:00 INFO org.spark_project.jetty.server.ServerConnector: Started ServerConnector@772485dd{HTTP/1.1}{0.0.0.0:4172}

18/03/02 20:54:00 INFO org.spark_project.jetty.server.Server: Started @2094ms

18/03/02 20:54:00 INFO com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystemBase: GHFS version: 1.6.3-hadoop2

18/03/02 20:54:01 INFO org.apache.hadoop.yarn.client.RMProxy: Connecting to ResourceManager at dp-igork-test-m/10.142.0.2:8032

18/03/02 20:54:03 INFO org.apache.hadoop.yarn.client.api.impl.YarnClientImpl: Submitted application application_1520023415468_0003

18/03/02 20:54:07 WARN org.apache.spark.SparkContext: Use an existing SparkContext, some configuration may not take effect.

Files on driver:

dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/._SUCCESS.crc

dp-igork-test-m : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_SUCCESS

Files on executors:

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000003_3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000002_2

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00002

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00003

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00003.crc

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00002.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00001.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00000

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000001_1

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000000_0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00001

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00000.crc

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000003_3

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000002_2

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00002

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00003

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00003.crc

dp-igork-test-w-1 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00002.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00001.crc

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00000

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000001_1

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/_temporary/0/_temporary/attempt_201803022054_0000_m_000000_0

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/part-00001

dp-igork-test-w-0 : /tmp/just-testing/50a5710f-5ff2-4145-8922-1befaf5b6740/test3/.part-00000.crc

18/03/02 20:54:12 INFO org.spark_project.jetty.server.ServerConnector: Stopped ServerConnector@772485dd{HTTP/1.1}{0.0.0.0:4172}
IgorK
  • 886
  • 9
  • 26
  • Thank you for the detailed explanation. Your answer helped me a lot to implement the algorithm that I wanted – Eswar Mar 05 '18 at 22:32