-1

avro deserialize function expects bytes in a list and fails while applying on a data frame. Only works with collect() but driver/master is running out of memory

Using spark version 2.3.3 with python 3.6.8 dataframe is being created from Hive table select which has avro serialized message. Then I am using https://github.com/Landoop/python-serializers.git since it supports avro deserialize using confluent schema registry on python 3.x

Tried applying the deserialize function on dataframe and it fails. Only works if I df.collect() and use for loop to deserialize every entry or works if I convert df into rdd.map and deserialize every row. Both these cases only work in test, in prod it either OOM or run forever on a 10GB hive data running on 5node 30GB,8cpu servers.

spark = SparkSession \
    .builder \
....
    .enableHiveSupport() \
    .getOrCreate()

df = spark.sql("SELECT * FROM table1")
unbase_df = df.select(unbase64(df.mycolumn1))

client = SchemaRegistryClient(url='1.2.3.4:1234')
serializer = MessageSerializer(client)

##attempt 1##FAILS##
decode_df = df.select(serializer.decode_message('mycolumn1'))
###->ERROR -> 
##attempt 2##FAILS##
decode_df_2 = df.select(serializer.decode_message(b'mycolumn1'))

##attempt 3##WORKS BUT OOM with high volume on master(drivermanager)##
unbase_collect = unbase_df.collect()
decode_list = [serializer.decode_message(msg.mycolumn1) for msg in unbase_collect]

##attempt 4##WORKS BUT RUNS FOR EVER##
def avrodecoder(row):
    decoded_row = serializer.decode_message(row['mycolumn1'])
    return decoded_row

decode_rdd = unbase_df.select("*").rdd.map(avrodecoder)

## After #3 or #4 works I convert back to dataframe with schema
schema = StructType([
    StructField("1column",  StructType([
.......
    StructField("ncolumn", StringType()])

decode_df = spark.createDataFrame(decode_rdd,schema)

Error message in case of #attempt 1

in decode_message(self, message)
    185             raise SerializerError("message is too small to decode")
    186
--> 187         with ContextBytesIO(message) as payload:
    188             magic, schema_id = struct.unpack('>bI', payload.read(5))
    189             if magic != MAGIC_BYTE:

TypeError: a bytes-like object is required, not 'str'```

Error message in case of #attempt 2
```.....python3.6/site-packages/datamountaineer/schemaregistry/serializers/MessageSerializer.py
in decode_message(self, message)
    188             magic, schema_id = struct.unpack('>bI', payload.read(5))
    189             if magic != MAGIC_BYTE:
--> 190                 raise SerializerError("message does not start with magic byte")
    191             decoder_func = self._get_decoder_func(schema_id, payload)
    192             return decoder_func(payload)

SerializerError: the message does not start with a magic byte ```
  1. How can I avro deserialize via confluent schema registry directly on a dataframe
  2. How can I make sure all the transformations are executed only on workers/executors
  3. How can I make it work efficiently enough it doesn't OOM or run 5,6+ hours for <10GB of data
  4. Don't understand the logic behind why 'Yarn Pending Memory' graph shows that in both working cases it climbs up to 7+TB or even higher
Smart Manoj
  • 5,230
  • 4
  • 34
  • 59

1 Answers1

0

Before you can apply a plain Python function on a Column, you have to convert it to an User Defined Function (UDF):

from pyspark.sql.functions import udf

@udf(decoded_row_schema)
def avrodecoder(row):
    decoded_row = serializer.decode_message(row['mycolumn1'])
    return decoded_row

where decoded_row_schema describes the shape of the returned object.

However if you use a current (>= 2.4.0) release, that might not be necessary at all - Pyspark 2.4.0, read avro from kafka with read stream - Python

  • I tried you second suggest spark_avro on Spark 2.4.x. Defined from_avro function similar to your example. But it errors out unbase64_df.show(1) |[00 00 00 01 76 4...| ``` avro_type_struct = """{....}""" avro_df = unbase64_raw_df.select(from_avro('unbase_raw_event',avro_type_struct)) ``` ERROR py4j.protocol.Py4JJavaError: ..while calling o77.showString. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 17 in stage 24.0 : Lost task 17.3 in stage 24.0 (TID 347, master, executor 2): org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1 – Enjay Jack Jun 14 '19 at 21:31