I found an example of encoding the rows of a Spark DataFrame to Avro in Scala.
credit: Spark Dataframe write to kafka topic in avro format?
My applications are written in Python and I'm having trouble porting the code from Spark to PySpark. Here is the Scala code:
Scala:
import org.apache.avro.Schema
import org.apache.avro.Schema.Parser
import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.io.EncoderFactory
import org.apache.avro.io.BinaryEncoder
import java.io.ByteArrayOutputStream
import org.apache.spark.sql.Row
import org.apache.spark.sql.functions._
def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
val gr: GenericRecord = new GenericData.Record(schema)
row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))
val writer = new SpecificDatumWriter[GenericRecord](schema)
val out = new ByteArrayOutputStream()
val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
writer.write(gr, encoder)
encoder.flush()
out.close()
out.toByteArray()
}
val SCHEMA_STRING = """
{
"namespace": "kakfa-avro.test",
"type": "record",
"name": "User",
"fields":[
{ "name": "name", "type": "string"},
{ "name": "num", "type": ["int", "null"]}
]
}
"""
val df = Seq(("Mary", 1), ("Brad", 2)).toDF("name", "num")
val schema = new Schema.Parser().parse(SCHEMA_STRING)
val encodeUDF = udf(encode(schema) _)
df.select(encodeUDF(struct(df.columns.map(column(_)):_*)).alias("value"))
PySpark:
from pyspark.sql.functions import col, struct, udf, lit
from pyspark.sql.types import ArrayType, ByteType
def encode(schema, row):
"""
"""
import io
import avro.schema
import avro.io
import json
avro_schema = avro.schema.Parse(json.dumps(schema))
writer = avro.io.DatumWriter(avro_schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(row, encoder)
return bytes_writer.getvalue()
encode_udf = udf(encode, ByteType())
SCHEMA = """
{
"namespace": "kakfa-avro.test",
"type": "record",
"name": "User",
"fields":[
{ "name": "name", "type": "string"},
{ "name": "num", "type": ["int", "null"]}
]
}
"""
df = spark.createDataFrame([('Mary', 1), ('Brad', 2)], ('name', 'num'))
df.select(encode_udf(lit(SCHEMA), struct([col(c).alias(c) for c in df.columns])).alias('value')).show()
My encode function for PySpark will properly encode a single row but I can't see to get it to be applied to the whole DataFrame. Any pointers would be greatly appreciated!
Exception:
Py4JJavaError: An error occurred while calling o463.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 38, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
process()
File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
serializer.dump_stream(func(split_index, iterator), outfile)
File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
self.serializer.dump_stream(self._batched(iterator), stream)
File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
for obj in iterator:
File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched
for item in iterator:
File "<string>", line 1, in <lambda>
File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
return lambda *a: f(*a)
File "<ipython-input-1-e5915375e985>", line 13, in encode
File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 1284, in Parse
return SchemaFromJSONData(json_data, names)
File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 1255, in SchemaFromJSONData
return parser(json_data, names=names)
File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 1132, in _SchemaFromJSONString
schema = names.GetSchema(name=json_string)
File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 417, in GetSchema
avro_name = self.GetName(name=name, namespace=namespace)
File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 398, in GetName
return Name(name=name, namespace=namespace)
File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 312, in __init__
'Invalid absolute schema name: %r.' % self._fullname)
avro.schema.SchemaParseException: Invalid absolute schema name: '\n{\n "namespace": "kakfa-avro.test",\n "type": "record",\n "name": "User",\n "fields":[\n { "name": "name", "type": "string"},\n { "name": "num", "type": ["int", "null"]}\n ]\n}\n'.