0

I am trying to write and read Snappy compressed byte array created from a protobuf from a Hadoop Sequence File.

The array read back from hadoop has trailing zeros. If a byte array is a small and simple removing trailing zeros is enough to parse the protobuf back, however for more complex objects and big sequence files parsing fails.

Byte array example:

val data = Array(1,2,6,4,2,1).map(_.toByte)
val distData = sparkContext.parallelize(Array.fill(5)(data))
  .map(j => (NullWritable.get(), new BytesWritable(j)))

distData
  .saveAsSequenceFile(file, Some(classOf[SnappyCodec]))

val original = distData.map(kv=> kv._2.getBytes).collect()

val decoded = sparkContext
  .sequenceFile[NullWritable, BytesWritable](file)
  .map( kv => kv._2.getBytes.mkString).collect().foreach(println(_))

Output: original := 126421 decoded := 126421000

1 Answers1

0

This problem stems from BytesWritable.getBytes, which returns a backing array that may be longer than your data. Instead, call copyBytes (as in Write and read raw byte arrays in Spark - using Sequence File SequenceFile).

See HADOOP-6298: BytesWritable#getBytes is a bad name that leads to programming mistakes for more details.

Josh Rosen
  • 13,511
  • 6
  • 58
  • 70