avro deserialize function expects bytes in a list and fails while applying on a data frame. Only works with collect() but driver/master is running out of memory
Using spark version 2.3.3 with python 3.6.8 dataframe is being created from Hive table select which has avro serialized message. Then I am using https://github.com/Landoop/python-serializers.git since it supports avro deserialize using confluent schema registry on python 3.x
Tried applying the deserialize function on dataframe and it fails. Only works if I df.collect() and use for loop to deserialize every entry or works if I convert df into rdd.map and deserialize every row. Both these cases only work in test, in prod it either OOM or run forever on a 10GB hive data running on 5node 30GB,8cpu servers.
spark = SparkSession \
.builder \
....
.enableHiveSupport() \
.getOrCreate()
df = spark.sql("SELECT * FROM table1")
unbase_df = df.select(unbase64(df.mycolumn1))
client = SchemaRegistryClient(url='1.2.3.4:1234')
serializer = MessageSerializer(client)
##attempt 1##FAILS##
decode_df = df.select(serializer.decode_message('mycolumn1'))
###->ERROR ->
##attempt 2##FAILS##
decode_df_2 = df.select(serializer.decode_message(b'mycolumn1'))
##attempt 3##WORKS BUT OOM with high volume on master(drivermanager)##
unbase_collect = unbase_df.collect()
decode_list = [serializer.decode_message(msg.mycolumn1) for msg in unbase_collect]
##attempt 4##WORKS BUT RUNS FOR EVER##
def avrodecoder(row):
decoded_row = serializer.decode_message(row['mycolumn1'])
return decoded_row
decode_rdd = unbase_df.select("*").rdd.map(avrodecoder)
## After #3 or #4 works I convert back to dataframe with schema
schema = StructType([
StructField("1column", StructType([
.......
StructField("ncolumn", StringType()])
decode_df = spark.createDataFrame(decode_rdd,schema)
Error message in case of #attempt 1
in decode_message(self, message) 185 raise SerializerError("message is too small to decode") 186 --> 187 with ContextBytesIO(message) as payload: 188 magic, schema_id = struct.unpack('>bI', payload.read(5)) 189 if magic != MAGIC_BYTE: TypeError: a bytes-like object is required, not 'str'``` Error message in case of #attempt 2 ```.....python3.6/site-packages/datamountaineer/schemaregistry/serializers/MessageSerializer.py in decode_message(self, message) 188 magic, schema_id = struct.unpack('>bI', payload.read(5)) 189 if magic != MAGIC_BYTE: --> 190 raise SerializerError("message does not start with magic byte") 191 decoder_func = self._get_decoder_func(schema_id, payload) 192 return decoder_func(payload) SerializerError: the message does not start with a magic byte ```
- How can I avro deserialize via confluent schema registry directly on a dataframe
- How can I make sure all the transformations are executed only on workers/executors
- How can I make it work efficiently enough it doesn't OOM or run 5,6+ hours for <10GB of data
- Don't understand the logic behind why 'Yarn Pending Memory' graph shows that in both working cases it climbs up to 7+TB or even higher