0

I found an example of encoding the rows of a Spark DataFrame to Avro in Scala.

credit: Spark Dataframe write to kafka topic in avro format?

My applications are written in Python and I'm having trouble porting the code from Spark to PySpark. Here is the Scala code:

Scala:

import org.apache.avro.Schema
import org.apache.avro.Schema.Parser

import org.apache.avro.generic.GenericData
import org.apache.avro.generic.GenericRecord
import org.apache.avro.specific.SpecificDatumWriter


import org.apache.avro.io.EncoderFactory
import org.apache.avro.io.BinaryEncoder

import java.io.ByteArrayOutputStream

import org.apache.spark.sql.Row

import org.apache.spark.sql.functions._

def encode(schema: org.apache.avro.Schema)(row: Row): Array[Byte] = {
  val gr: GenericRecord = new GenericData.Record(schema)
  row.schema.fieldNames.foreach(name => gr.put(name, row.getAs(name)))

  val writer = new SpecificDatumWriter[GenericRecord](schema)
  val out = new ByteArrayOutputStream()
  val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
  writer.write(gr, encoder)
  encoder.flush()
  out.close()

  out.toByteArray()
}

val SCHEMA_STRING = """
{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "User",
     "fields":[
         {   "name": "name",  "type": "string"},
         {   "name": "num", "type": ["int", "null"]}
     ]
}
"""


val df = Seq(("Mary", 1), ("Brad", 2)).toDF("name", "num")

val schema = new Schema.Parser().parse(SCHEMA_STRING)
val encodeUDF = udf(encode(schema) _)

df.select(encodeUDF(struct(df.columns.map(column(_)):_*)).alias("value"))

PySpark:

from pyspark.sql.functions import col, struct, udf, lit
from pyspark.sql.types import ArrayType, ByteType

def encode(schema, row):
    """
    """
    import io
    import avro.schema
    import avro.io
    import json

    avro_schema = avro.schema.Parse(json.dumps(schema))
    writer = avro.io.DatumWriter(avro_schema)

    bytes_writer = io.BytesIO()
    encoder = avro.io.BinaryEncoder(bytes_writer)
    writer.write(row, encoder)

    return bytes_writer.getvalue()


encode_udf = udf(encode, ByteType())


SCHEMA = """
{
    "namespace": "kakfa-avro.test",
     "type": "record",
     "name": "User",
     "fields":[
         {   "name": "name",  "type": "string"},
         {   "name": "num", "type": ["int", "null"]}
     ]
}
"""

df = spark.createDataFrame([('Mary', 1), ('Brad', 2)], ('name', 'num'))

df.select(encode_udf(lit(SCHEMA), struct([col(c).alias(c) for c in df.columns])).alias('value')).show()

My encode function for PySpark will properly encode a single row but I can't see to get it to be applied to the whole DataFrame. Any pointers would be greatly appreciated!

Exception:

Py4JJavaError: An error occurred while calling o463.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 15.0 failed 1 times, most recent failure: Lost task 2.0 in stage 15.0 (TID 38, localhost, executor driver): org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 177, in main
    process()
  File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 172, in process
    serializer.dump_stream(func(split_index, iterator), outfile)
  File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 220, in dump_stream
    self.serializer.dump_stream(self._batched(iterator), stream)
  File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 138, in dump_stream
    for obj in iterator:
  File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/serializers.py", line 209, in _batched
    for item in iterator:
  File "<string>", line 1, in <lambda>
  File "/usr/local/Cellar/apache-spark/2.2.1/libexec/python/lib/pyspark.zip/pyspark/worker.py", line 71, in <lambda>
    return lambda *a: f(*a)
  File "<ipython-input-1-e5915375e985>", line 13, in encode
  File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 1284, in Parse
    return SchemaFromJSONData(json_data, names)
  File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 1255, in SchemaFromJSONData
    return parser(json_data, names=names)
  File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 1132, in _SchemaFromJSONString
    schema = names.GetSchema(name=json_string)
  File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 417, in GetSchema
    avro_name = self.GetName(name=name, namespace=namespace)
  File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 398, in GetName
    return Name(name=name, namespace=namespace)
  File "/usr/local/lib/python3.6/site-packages/avro/schema.py", line 312, in __init__
    'Invalid absolute schema name: %r.' % self._fullname)
avro.schema.SchemaParseException: Invalid absolute schema name: '\n{\n    "namespace": "kakfa-avro.test",\n     "type": "record",\n     "name": "User",\n     "fields":[\n         {   "name": "name",  "type": "string"},\n         {   "name": "num", "type": ["int", "null"]}\n     ]\n}\n'.
moku
  • 4,099
  • 5
  • 30
  • 52
  • Missing `lit` (like `encode_udf(lit(SCHEMA), ...)`)? – Alper t. Turker Jul 13 '18 at 14:41
  • Possible duplicate of [How to pass a constant value to Python UDF?](https://stackoverflow.com/questions/35375255/how-to-pass-a-constant-value-to-python-udf) – Alper t. Turker Jul 13 '18 at 14:44
  • hmmm I get `AttributeError: module 'avro' has no attribute 'schema'` when I add `lit(SCHEMA)` – moku Jul 13 '18 at 14:44
  • It looks like some other mixup. What it you import in the function? – Alper t. Turker Jul 13 '18 at 14:47
  • @user8371915 I'm not sure what you mean? All my imports are declared at the top of the code block. – moku Jul 13 '18 at 14:51
  • On a side note I see you copied code from [my answer](https://stackoverflow.com/a/47952472/8371915) - I'd appreciate [attribution](https://stackoverflow.blog/2009/06/25/attribution-required/). – Alper t. Turker Jul 13 '18 at 14:52
  • `def encode(schema, row): import avro.schema ...` – Alper t. Turker Jul 13 '18 at 14:52
  • @user8371915 I'll certainly add the attribution. Thanks for letting me know about it. – moku Jul 13 '18 at 14:54
  • Also you could use Scala code directly - [How to use a Scala class inside Pyspark](https://stackoverflow.com/q/36023860/8371915) as Python Avro used to be very slow. – Alper t. Turker Jul 13 '18 at 15:02
  • That is cool didn't know about that. Looks like importing `avro.schema` in the encode function doesn't solve the exception... – moku Jul 13 '18 at 15:05
  • At first glance the code looks valid, but I have no Python 2 installation to test it. Could you post a full traceback? – Alper t. Turker Jul 13 '18 at 15:24
  • I'm using python3... there is an `avro-python3` package. Anyway, I closed the PySpark shell then move the avro imports to the encode function and that seemed to solve the import exception. Now I'm getting a `avro.schema.SchemaParseException: Invalid absolute schema name` exception. Posting above... – moku Jul 13 '18 at 15:27
  • I see two problems a) There should be no `json.dumps` - data is already JSON `str`. b) Schema name cannot contain hypen if I remember correctly. – Alper t. Turker Jul 13 '18 at 18:18
  • @moku were you able to resolve this issue? – Gowtham Jul 15 '19 at 14:46

0 Answers0