0

I am new to Kafka and spring cloud stream. Now, I have a problem with start a Kafka project for sending message. It is show Null Pointer exception when first running my application.

log

java.lang.NullPointerException: null
    at java.base/java.util.HashMap.putMapEntries(HashMap.java:497) ~[na:na]
    at java.base/java.util.LinkedHashMap.<init>(LinkedHashMap.java:385) ~[na:na]
    at org.springframework.core.annotation.MapAnnotationAttributeExtractor.enrichAndValidateAttributes(MapAnnotationAttributeExtractor.java:93) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.core.annotation.MapAnnotationAttributeExtractor.<init>(MapAnnotationAttributeExtractor.java:58) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.core.annotation.AnnotationUtils.synthesizeAnnotation(AnnotationUtils.java:1609) ~[spring-core-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.cloud.stream.config.BindingBeansRegistrar.collectClasses(BindingBeansRegistrar.java:56) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.cloud.stream.config.BindingBeansRegistrar.registerBeanDefinitions(BindingBeansRegistrar.java:43) ~[spring-cloud-stream-3.0.8.RELEASE.jar:3.0.8.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.lambda$loadBeanDefinitionsFromRegistrars$1(ConfigurationClassBeanDefinitionReader.java:364) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at java.base/java.util.LinkedHashMap.forEach(LinkedHashMap.java:723) ~[na:na]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsFromRegistrars(ConfigurationClassBeanDefinitionReader.java:363) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitionsForConfigurationClass(ConfigurationClassBeanDefinitionReader.java:145) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassBeanDefinitionReader.loadBeanDefinitions(ConfigurationClassBeanDefinitionReader.java:117) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.processConfigBeanDefinitions(ConfigurationClassPostProcessor.java:327) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.annotation.ConfigurationClassPostProcessor.postProcessBeanDefinitionRegistry(ConfigurationClassPostProcessor.java:232) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanDefinitionRegistryPostProcessors(PostProcessorRegistrationDelegate.java:275) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.PostProcessorRegistrationDelegate.invokeBeanFactoryPostProcessors(PostProcessorRegistrationDelegate.java:95) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.invokeBeanFactoryPostProcessors(AbstractApplicationContext.java:705) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:531) ~[spring-context-5.1.9.RELEASE.jar:5.1.9.RELEASE]
    at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:141) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:744) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:391) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:312) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1215) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at org.springframework.boot.SpringApplication.run(SpringApplication.java:1204) ~[spring-boot-2.1.8.RELEASE.jar:2.1.8.RELEASE]
    at stream.websokect.demo.DemoApplication.main(DemoApplication.java:15) ~[classes/:na]

application.properties


spring.cloud.stream.bindings.output.destination= meetUpTopic
spring.cloud.stream.bindings.output.producer.partition-count= 1

spring.cloud.stream.bindings.output.content-type= text/plain
spring.cloud.stream.bindings.output.producer.header-mode= raw

DemoApplication

import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.web.socket.client.WebSocketClient;
import org.springframework.web.socket.client.standard.StandardWebSocketClient;

@SpringBootApplication
public class DemoApplication {
    public static String MEET_UP_STREAM = "ws://stream.meetup.com/2/rsvps";

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, args);
    }

    @Bean
    public ApplicationRunner initializeConnection(RsvpWebSocketHandler rsvpWebSocketHandler) {
        System.out.println("Hello=============");
        return args -> {
            WebSocketClient webSocketClient = new StandardWebSocketClient();

            webSocketClient.doHandshake(rsvpWebSocketHandler, MEET_UP_STREAM);
        };
    }

}

RsvpKafkaProducer

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketMessage;
@Component
@EnableBinding(Source.class)
public class RsvpKafkaProducer {
    private final Source source;

    public RsvpKafkaProducer(Source source) {
        this.source = source;
    }

    public void sendRsvpMessage(WebSocketMessage<?> message) {
        source.output()
                .send(MessageBuilder.withPayload(message.getPayload()).build(), 10000);
    }
}

RsvpWebSocketHandler

import org.springframework.stereotype.Component;
import org.springframework.web.socket.WebSocketMessage;
import org.springframework.web.socket.WebSocketSession;
import org.springframework.web.socket.handler.AbstractWebSocketHandler;

@Component
public class RsvpWebSocketHandler extends AbstractWebSocketHandler {

    private final RsvpKafkaProducer rsvpKafkaProducer;

    public RsvpWebSocketHandler(RsvpKafkaProducer rsvpKafkaProducer) {
        this.rsvpKafkaProducer = rsvpKafkaProducer;
    }

    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) {
        System.out.println(message.getPayload() + "================/n");

        rsvpKafkaProducer.sendRsvpMessage(message);
    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>stream.websokect</groupId>
    <artifactId>demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>demo</name>
    <description>Demo project for Spring Boot</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-websocket</artifactId>
            <version>4.0.0.RELEASE</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
            <version>3.0.8.RELEASE</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-stream-binder-kafka -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
            <version>3.0.8.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245

1 Answers1

0

BindingBeansRegistrar.collectClasses It's some problem with an @EnableBinding annotation; what version are you using?

I tried to reproduce it with another @EnableBinding with no classes, but it still loaded ok.

You could set a breakpoint in BindingBeansRegistrar.collectClasses to see what attributes it is trying to synthesize an annotation from.

I copied your producer to a new Spring Boot app and it worked without any problems; if you can post a stripped-down application someplace (without the websocket stuff) I can take a look to see what's wrong.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179