3

I have been trying to integrate kafka within spring boot but doesn't seem to work. I am getting an exception while consuming the message. I think the publisher works fine but the consumer fails to deserialize the message.

Caused by: java.lang.ClassCastException: class com.example.schema.avro.Event cannot be cast to 
    class com.example.schema.avro.Event (com.example.schema.avro.Event is in unnamed module of loader
     'app'; com.example.schema.avro.Event is in unnamed module of loader
     org.springframework.boot.devtools.restart.classloader.RestartClassLoader @58da768e)

I searched for the exception and the solution mentioned was to specific.avro.reader: true but didn't seem to work for me.

This is my pom file

            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.2.4</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-schema</artifactId>
            <version>2.2.1.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>5.3.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.avro</groupId>
            <artifactId>avro</artifactId>
            <version>1.11.0</version>
        </dependency>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.avro</groupId>
                <artifactId>avro-maven-plugin</artifactId>
                <version>1.8.2</version>
                <executions>
                    <execution>
                        <id>schemas</id>
                        <phase>generate-sources</phase>
                        <goals>
                            <goal>schema</goal>
                            <goal>protocol</goal>
                            <goal>idl-protocol</goal>
                        </goals>
                        <configuration>
                            <sourceDirectory>${project.basedir}/src/main/resources/schema/</sourceDirectory>
                            <outputDirectory>${project.basedir}/target/generated-sources/avro/</outputDirectory>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    <repositories>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

yaml file

spring:
  main:
    allow-bean-definition-overriding: true
  kafka:
    bootstrap-servers: localhost:9092
    binder:
      producer-properties:
        key.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
        schema.registry.url: http://localhost:8081
      consumer-properties:
        key.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        value.deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
        schema.registry.url: http://localhost:8081
        specific.avro.reader: true
  cloud:
    stream:
      schemaRegistryClient:
        endpoint: http://localhost:8081
      bindings:
        event-out-0:
          destination: event-details
          contentType: application/*+avro
        event-in-0:
          destination: event-details
          contentType: application/*+avro
      function:
        definition: event

And in the code to publish an event


    public Event saveEvent(final EventDto eventDto) {
        final boolean send = streamBridge.send("event-out-0", new com.example.schema.avro.Event(eventDto.eventId()));
        log.info(String.valueOf(send));
    }

Configuration

@Configuration
@Slf4j
public class Config {

    @Bean
    public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") final String endPoint) {
        final ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endPoint);
        return client;
    }

    @Bean
    public MessageConverter avroSchemaMessageConverterAnotherBean() {
        return new AvroSchemaMessageConverter(new AvroSchemaServiceManagerImpl());
    }

    @Bean
    public Function<EventDto, Event> eventOut() {
        return e -> new Event(e.eventId());
    }

    @Bean
    public Consumer<Event> event() {
        return e -> log.info("event captured {}", e.getEventId().toString());
    }
}
user1298426
  • 3,467
  • 15
  • 50
  • 96
  • 1
    Since it's a `ClassCastException` between 2 classes with the same qualified name, this looks like a class loader issue: somehow `com.example.schema.avro.Event` is incorrectly loaded by 2 different classloaders, s.t. instances pointing to one loader are not understandable (i.e. not "castable") to the other. I don't know enough of Spring to fix it. although maybe this helps https://stackoverflow.com/questions/826319/classcastexception-when-casting-to-the-same-class ? – Svend Jul 10 '22 at 08:46
  • 2
    Thanks for pointing to the link. Indeed, the problem was due to spring-boot-devtools jar – user1298426 Jul 10 '22 at 09:13
  • 1
    You can avoid the problem by configuring the consumer factory with the deserializer directly (e.g. `setValueDeserializer()` or via the constructor). That way, the deserializer will be loaded by the same class loader, instead of by the kafka clients jar by class name. – Gary Russell Jul 11 '22 at 13:37

0 Answers0