3

Why does spark throw NotSerializableException org.apache.hadoop.io.NullWritable with sequence files? My code (very simple):

import org.apache.hadoop.io.{BytesWritable, NullWritable}
sc.sequenceFile[NullWritable, BytesWritable](in).repartition(1000).saveAsSequenceFile(out, None)

The exception

org.apache.spark.SparkException: Job aborted: Task 1.0:66 had a not serializable result: java.io.NotSerializableException: org.apache.hadoop.io.NullWritable
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
eliasah
  • 39,588
  • 11
  • 124
  • 154
samthebest
  • 30,803
  • 25
  • 102
  • 142

2 Answers2

6

So it is possible to read non-serializable types into an RDD - i.e. have an RDD of something that is not serializable (which seems counter intuitive). But once you wish to perform an operation on that RDD that requires the objects to be serializable, like repartition it needs to be serializable. Moreover it turns out that those weird classes SomethingWritable, although invented for the sole perpose of serializing things are not actually serializable :(. So you must map these things to byte arrays and back again:

sc.sequenceFile[NullWritable, BytesWritable](in)
.map(_._2.copyBytes()).repartition(1000)
.map(a => (NullWritable.get(), new BytesWritable(a)))
.saveAsSequenceFile(out, None)

Also see: https://stackoverflow.com/a/22594142/1586965

Community
  • 1
  • 1
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • If the input is sequenceFile[BytesWritable,BytesWritable], then I use "map" to convert [BytesWritable,BytesWritable] to [byte[],byte[]], and I want to reduceByKey, but it returns an error "Default partitioner cannot partition array keys". Do you have any solutions? – Marco Dinh Jun 13 '15 at 03:54
  • 1
    @MichaelDinh `.toList` on your keys, so `.map(p => (p._1.toList, p._2))` then `reduceByKey` – samthebest Jun 13 '15 at 18:23
0

In spark if you try to use a third party class which is not serializable it throws NotSerializable exception.It's because of the closure property of spark i.e whatever instance variable (which are defined outside the transformation operation) you try to access inside a transformation operation spark tries to serialize it as well as all the dependent classes of that object.

Harsh Gupta
  • 339
  • 4
  • 20
  • Well in this instance I'm not trying to access instance variables outside a transformation operation. In fact I successfully perform a transformation operation on the 3rd party classes. Usually I see this exception exactly in the situation you describe, but this time I was somewhat baffled. What I learnt is that classes in RDDs need not be serializable, they need to be serializable if and only if the program requires their serialization to actually happen at some point. – samthebest Jun 15 '14 at 18:49