Hello together im struggling with (de-)serializing a simple avro schema together with schema registry.
The setup:
- 2 Flink jobs written in java (one consumer, one producer)
- 1 confluent schema registry for schema validation
- 1 kafka cluster for messaging
The target:
The producer should send a message serialized with ConfluentRegistryAvroSerializationSchema
which includes updating and validating the schema.
The consumer should then deserialize the message into an object with the received schema. Using ConfluentRegistryAvroDeserializationSchema
.
So far so good:
If i configre my subject on the schema registry to be FORWARD
-compatible the producer writes the correct avro schema to the registry, but it ends with the error (even if i completely and permanetly delete the subject first):
Failed to send data to Kafka: Schema being registered is incompatible with an earlier schema for subject "my.awesome.MyExampleEntity-value"
The schema was successfully written:
{
"subject": "my.awesome.MyExampleEntity-value",
"version": 1,
"id": 100028,
"schema": "{\"type\":\"record\",\"name\":\"input\",\"namespace\":\"my.awesome.MyExampleEntity\",\"fields\":[{\"name\":\"serialNumber\",\"type\":\"string\"},{\"name\":\"editingDate\",\"type\":\"int\",\"logicalType\":\"date\"}]}"
}
following this i could try to set the compability
to NONE
If i do so i can produce my data on the kafka but: The schema registry has a new version of my schema looking like this:
{
"subject": "my.awesome.MyExampleEntity-value",
"version": 2,
"id": 100031,
"schema": "\"bytes\""
}
Now i can produce data but the consumer is not able to deserialize this schema emiting the following error:
Caused by: org.apache.avro.AvroTypeException: Found bytes, expecting my.awesome.MyExampleEntity
...
Im currently not sure where the problem exactly is. Even if i completely and permanetly delete the subject (including schemas) my producer should work fine from scratch registering a whole "new" subject with schema. On the other hand if i set the compatibility to "NONE" the schema exchange should work anyway by should registering a schema which can be read by the consumer.
Can anybody help me out here?