1

Hello together im struggling with (de-)serializing a simple avro schema together with schema registry.

The setup:

  • 2 Flink jobs written in java (one consumer, one producer)
  • 1 confluent schema registry for schema validation
  • 1 kafka cluster for messaging

The target: The producer should send a message serialized with ConfluentRegistryAvroSerializationSchema which includes updating and validating the schema.

The consumer should then deserialize the message into an object with the received schema. Using ConfluentRegistryAvroDeserializationSchema.

So far so good: If i configre my subject on the schema registry to be FORWARD-compatible the producer writes the correct avro schema to the registry, but it ends with the error (even if i completely and permanetly delete the subject first):

Failed to send data to Kafka: Schema being registered is incompatible with an earlier schema for subject "my.awesome.MyExampleEntity-value"

The schema was successfully written:

{
        "subject": "my.awesome.MyExampleEntity-value",
        "version": 1,
        "id": 100028,
        "schema": "{\"type\":\"record\",\"name\":\"input\",\"namespace\":\"my.awesome.MyExampleEntity\",\"fields\":[{\"name\":\"serialNumber\",\"type\":\"string\"},{\"name\":\"editingDate\",\"type\":\"int\",\"logicalType\":\"date\"}]}"
}

following this i could try to set the compability to NONE

If i do so i can produce my data on the kafka but: The schema registry has a new version of my schema looking like this:

{
        "subject": "my.awesome.MyExampleEntity-value",
        "version": 2,
        "id": 100031,
        "schema": "\"bytes\""
}

Now i can produce data but the consumer is not able to deserialize this schema emiting the following error:

Caused by: org.apache.avro.AvroTypeException: Found bytes, expecting my.awesome.MyExampleEntity
...

Im currently not sure where the problem exactly is. Even if i completely and permanetly delete the subject (including schemas) my producer should work fine from scratch registering a whole "new" subject with schema. On the other hand if i set the compatibility to "NONE" the schema exchange should work anyway by should registering a schema which can be read by the consumer.

Can anybody help me out here?

Peter C. Glade
  • 543
  • 2
  • 8
  • 16
  • What is the property of setting the compatibility mode and where to add this property in the code? – MiniSu Feb 17 '22 at 09:37
  • @Sam you need to set the compability mode on the Schema registry, either per config or rest api. See https://docs.confluent.io/platform/current/schema-registry/develop/api.html – Peter C. Glade Feb 17 '22 at 09:49
  • How about property schema.compatibility.level given on https://docs.confluent.io/platform/current/schema-registry/installation/config.html page? – MiniSu Feb 17 '22 at 11:41
  • schema.compatibility.level didn't work from. did you find a way to set it through flink job during sink ? – Sucheth Shivakumar Jun 13 '22 at 03:20

2 Answers2

1

According to a latest confluent doc NONE: schema compatibility checks are disabled docs:

Niko
  • 373
  • 3
  • 13
  • Where can is set compatibility checks property In my flinkkafka based application? – MiniSu Feb 17 '22 at 10:05
  • @Sam if you still have questions you'd better ask your own question on StackOverflow not in comments. Reading documentation also can help. To set your compatibility see [this example](https://docs.confluent.io/platform/current/schema-registry/develop/using.html#update-compatibility-requirements-globally) – Niko Feb 18 '22 at 08:05
0

The whole problem with serialisation was about the usage of the following flag in the kafka config:

"schema.registry.url"
"key.serializer"
"key.deserializer"
"value.serializer"
"value.deserializer"

Setting this flags in flink, even if they are logically correct leads to a undebuggable schema validation and serialisation chaos. Omitted all of these flags and it works fine. The registry url needs to be set in ConfluentRegistryAvro(De)serializationSchema only.

Peter C. Glade
  • 543
  • 2
  • 8
  • 16