6

I'm pushing a stream of data to Azure EventHub with the following code leveraging Microsoft.Hadoop.Avro.. this code runs every 5 seconds, and simply plops the same two Avro serialised items :

  var strSchema = File.ReadAllText("schema.json");
  var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
  var rootSchema = avroSerializer.WriterSchema as RecordSchema;

  var itemList = new List<AvroRecord>();

  dynamic record_one = new AvroRecord(rootSchema);
  record_one.FirstName = "Some";
  record_one.LastName = "Guy";
  itemList.Add(record_one);

  dynamic record_two = new AvroRecord(rootSchema);
  record_two.FirstName = "A.";
  record_two.LastName = "Person";
  itemList.Add(record_two);

  using (var buffer = new MemoryStream())
  {
      using (var writer = AvroContainer.CreateGenericWriter(strSchema, buffer, Codec.Null))
      {
          using (var streamWriter = new SequentialWriter<object>(writer, itemList.Count))
          {
              foreach (var item in itemList)
              {
                  streamWriter.Write(item);
              }
          }
      }

      eventHubClient.SendAsync(new EventData(buffer.ToArray()));
  }

The schema used here is, again, v. simple:

{
  "type": "record",
  "name": "User",
  "namespace": "SerDes",
  "fields": [
    {
      "name": "FirstName",
      "type": "string"
    },
    {
      "name": "LastName",
      "type": "string"
    }
  ]
}

I have validated this is all good, with a simple view in Azure Stream Analytics on the portal:

Stream Analytics Screenshot

So far so good, but i cannot, for the life of me correctly deserialize this in Databricks leverage the from_avro() command under Scala..

Load (the exact same) schema as a string:

val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")

Configure EventHub

val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
  .setEventHubName("<NAME_OF_EVENT_HUB>")
  .build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()

Read the data..

// this works, and i can see the serialised data
display(eventhubs.select($"body"))

// this fails, and with an exception: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
display(eventhubs.select(from_avro($"body", sampleJsonSchema)))

So essentially, what is going on here.. i am serialising the data with the same schema as deserializing, but something is malformed.. the documentation is incredibly sparse on this front (very very minimal on the Microsoft website).

m1nkeh
  • 1,337
  • 23
  • 45

1 Answers1

2

The issue

After additional investigation, (and mainly with the help of this article) I found what my problem was: from_avro(data: Column, jsonFormatSchema: String) expects spark schema format and not avro schema format. The documentation is not very clear on this.

Solution 1

Databricks provides a handy method from_avro(column: Column, subject: String, schemaRegistryUrl: String)) that fetches needed avro schema from kafka schema registry and automatically converts to correct format.

Unfortunately, it is not available for pure spark, nor is it possible to use it without a kafka schema registry.

Solution 2

Use schema conversion provided by spark:

// define avro deserializer
class AvroDeserializer() extends AbstractKafkaAvroDeserializer {
  override def deserialize(payload: Array[Byte]): String = {
    val genericRecord = this.deserialize(payload).asInstanceOf[GenericRecord]
    genericRecord.toString
  }
}

// create deserializer instance
val deserializer = new AvroDeserializer()

// register deserializer
spark.udf.register("deserialize_avro", (bytes: Array[Byte]) =>
  deserializer.deserialize(bytes)
)

// get avro schema from registry (but I presume that it should also work with schema read from a local file)
val registryClient = new CachedSchemaRegistryClient(kafkaSchemaRegistryUrl, 128)
val avroSchema = registryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

// consume data 
df.selectExpr("deserialize_avro(value) as data")
  .select(from_json(col("data"), sparkSchema.dataType).as("data"))
  .select("data.*")
noscreenname
  • 3,314
  • 22
  • 30
  • so i assume by this you are actually using a schema registry? from recollection (this is quite an old question now) i don't think i had a schema registry.. this would imply you are leveraging apache kafka perhaps? i will give it another whirl though - still got the code somewhere i will also double check my spark version – m1nkeh Jul 03 '20 at 17:26
  • p.s. i wrote my stuff in PySpark – m1nkeh Jul 03 '20 at 17:28
  • 2
    `from_avro` with direct support for Schema registry is for Databricks only, as I remember... in stock Spark it requires JSON schema, that you may get from registry via HTTP – Alex Ott Jul 04 '20 at 09:07
  • Yeah, you are right, it works in a databricks notebook, but not in pure Spark :/ – noscreenname Jul 04 '20 at 09:14
  • Edit with more details – noscreenname Jul 06 '20 at 09:21