I have been trying to integrate kafka within spring boot but doesn't seem to work. I am getting an exception while consuming the message. I think the publisher works fine but the consumer fails to deserialize the message.
Caused by: java.lang.ClassCastException: class com.example.schema.avro.Event cannot be cast to
class com.example.schema.avro.Event (com.example.schema.avro.Event is in unnamed module of loader
'app'; com.example.schema.avro.Event is in unnamed module of loader
org.springframework.boot.devtools.restart.classloader.RestartClassLoader @58da768e)
I searched for the exception and the solution mentioned was to specific.avro.reader: true
but didn't seem to work for me.
This is my pom file
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kafka</artifactId>
<version>3.2.4</version>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-schema</artifactId>
<version>2.2.1.RELEASE</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>5.3.0</version>
</dependency>
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
<version>1.11.0</version>
</dependency>
<build>
<plugins>
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<id>schemas</id>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
<goal>protocol</goal>
<goal>idl-protocol</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
<outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
yaml file
spring:
main:
allow-bean-definition-overriding: true
kafka:
bootstrap-servers: localhost:9092
binder:
producer-properties:
key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
schema.registry.url: http://localhost:8081
consumer-properties:
key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
schema.registry.url: http://localhost:8081
specific.avro.reader: true
cloud:
stream:
schemaRegistryClient:
endpoint: http://localhost:8081
bindings:
event-out-0:
destination: event-details
contentType: application/*+avro
event-in-0:
destination: event-details
contentType: application/*+avro
function:
definition: event
And in the code to publish an event
public Event saveEvent(final EventDto eventDto) {
final boolean send = streamBridge.send("event-out-0", new com.example.schema.avro.Event(eventDto.eventId()));
log.info(String.valueOf(send));
}
Configuration
@Configuration
@Slf4j
public class Config {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") final String endPoint) {
final ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endPoint);
return client;
}
@Bean
public MessageConverter avroSchemaMessageConverterAnotherBean() {
return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
}
@Bean
public Function<EventDto, Event> eventOut() {
return e -> new Event(e.eventId());
}
@Bean
public Consumer<Event> event() {
return e -> log.info("event captured {}", e.getEventId().toString());
}
}