10

How do you write RDD[Array[Byte]] to a file using Apache Spark and read it back again?

samthebest
  • 30,803
  • 25
  • 102
  • 142

2 Answers2

15

Common problems seem to be getting a weird cannot cast exception from BytesWritable to NullWritable. Other common problem is BytesWritable getBytes is a totally pointless pile of nonsense which doesn't get bytes at all. What getBytes does is get your bytes than adds a ton of zeros on the end! You have to use copyBytes

val rdd: RDD[Array[Byte]] = ???

// To write
rdd.map(bytesArray => (NullWritable.get(), new BytesWritable(bytesArray)))
  .saveAsSequenceFile("/output/path", codecOpt)

// To read
val rdd: RDD[Array[Byte]] = sc.sequenceFile[NullWritable, BytesWritable]("/input/path")
  .map(_._2.copyBytes())
samthebest
  • 30,803
  • 25
  • 102
  • 142
  • This post is relatively old so just wanted to know whether the answer is still up to date? Is it still necessary to use copyBytes before reading? – Sam Stoelinga Jan 17 '16 at 13:20
  • @SamStoelinga Yes I think so, it's Hadoop API that is unlikely to change. – samthebest Jan 18 '16 at 14:36
  • A more efficient alternative is to use `.getBytes()` and process only up to `.getLength()` bytes. Of course, if you strictly need an `RDD[Array[Byte]]`, this approach won't work, but you could consider an `RDD[(Array[Byte], Int)]`. – user1609012 Jun 08 '16 at 23:44
  • 2
    Can anyone post an entire working code snippet including what packages to be imported? Thanks. – Choix Mar 25 '18 at 14:25
  • @Choix - I had the same issue. Posting snippet that solved my problem as a separate answer. – Chris Bedford Jul 27 '19 at 22:17
0

Here is a snippet with all required imports that you can run from spark-shell, as requested by @Choix

import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.NullWritable

val path = "/tmp/path"

val rdd = sc.parallelize(List("foo"))
val bytesRdd = rdd.map{str  =>  (NullWritable.get, new BytesWritable(str.getBytes) )  }
bytesRdd.saveAsSequenceFile(path)

val recovered = sc.sequenceFile[NullWritable, BytesWritable]("/tmp/path").map(_._2.copyBytes())
val recoveredAsString = recovered.map( new String(_) )
recoveredAsString.collect()
// result is:  Array[String] = Array(foo)
Chris Bedford
  • 2,560
  • 3
  • 28
  • 60