1

I have input in json format with two fields, (size : BigInteger and data : String). Here data contains ZStd compressed Avro records. The task is to decode these records. I am using Spark-avro for this. But getting, Task not serializable exception.

Sample Data

{
"data": "7z776qOPevPJF5/0Dv9Rzx/1/i8gJJiQD5MTDGdbeNKKT"
"size" : 231
}

Code

import java.util.Base64
import com.github.luben.zstd.Zstd
import org.apache.avro.Schema
import com.twitter.bijection.Injection
import org.apache.avro.generic.GenericRecord
import com.twitter.bijection.avro.GenericAvroCodecs
import com.databricks.spark.avro.SchemaConverters
import org.apache.spark.sql.types.StructType
import com.databricks.spark.avro.SchemaConverters._

def decode2(input:String,size:Int,avroBijection:Injection[GenericRecord, Array[Byte]], sqlType:StructType): GenericRecord = {

        val compressedGenericRecordBytes = Base64.getDecoder.decode(input)
        val genericRecordBytes = Zstd.decompress(compressedGenericRecordBytes,size)
        avroBijection.invert(genericRecordBytes).get
}

val myRdd = spark.read.format("json").load("/path").rdd

val rows = myRdd.mapPartitions{
    lazy val schema = new Schema.Parser().parse(schemaStr)
    lazy val avroBijection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)    
    lazy val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
    (iterator) => {
        val myList = iterator.toList
        myList.map{ x => {
            val size = x(1).asInstanceOf[Long].intValue
            val data = x(0).asInstanceOf [String]
            decode2(data, size, avroBijection,sqlType)
        }
    }.iterator
    }
}

Exception

files: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[987] at rdd at <console>:346
org.apache.spark.SparkException: Task not serializable
  at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
  at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
  at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
  at org.apache.spark.SparkContext.clean(SparkContext.scala:2287)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:794)
  at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1.apply(RDD.scala:793)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
  at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
  at org.apache.spark.rdd.RDD.mapPartitions(RDD.scala:793)
  ... 112 elided
Caused by: java.io.NotSerializableException: org.apache.avro.generic.GenericDatumReader
Serialization stack:
    - object not serializable (class: org.apache.avro.generic.GenericDatumReader, value: org.apache.avro.generic.GenericDatumReader@4937cd88)
    - field (class: com.twitter.bijection.avro.BinaryAvroCodec, name: reader, type: interface org.apache.avro.io.DatumReader)
    - object (class com.twitter.bijection.avro.BinaryAvroCodec, com.twitter.bijection.avro.BinaryAvroCodec@6945439c)
    - field (class: $$$$79b2515edf74bd80cfc9d8ac1ba563c6$$$$iw, name: avroBijection, type: interface com.twitter.bijection.Injection)

Already tried SO posts

  1. Spark: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema

Following this post I have update the decode2 method to take schemaStr as input and convert to schema and SqlType within method. No change in exception

  1. Use schema to convert AVRO messages with Spark to DataFrame

Used the code provided in the post to create object Injection and then use it. This one also didn't help.

Silhoutte
  • 57
  • 10

1 Answers1

1

have you tried

val rows = myRdd.mapPartitions{
    (iterator) => {
        val myList = iterator.toList
        myList.map{ x => {
    lazy val schema = new Schema.Parser().parse(schemaStr)
    lazy val avroBijection: Injection[GenericRecord, Array[Byte]] = GenericAvroCodecs.toBinary(schema)    
    lazy val sqlType = SchemaConverters.toSqlType(schema).dataType.asInstanceOf[StructType]
            val size = x(1).asInstanceOf[Long].intValue
            val data = x(0).asInstanceOf [String]
            decode2(data, size, avroBijection,sqlType)
        }
    }.iterator
    }
QuickSilver
  • 3,915
  • 2
  • 13
  • 29