2

I'm using Apache Pulsar for the first time but finding it hard to publish and listen to custom objects. I have defined schemas as directed by this reference pages but for some reason it’s not working.

@SpringBootApplication
public class QuickbooksApplication {

  public static void main(String[] args) {
    SpringApplication.run(QuickbooksApplication.class, args);
  }

  @Data
  @Builder
  @ToString
  @NoArgsConstructor
  @AllArgsConstructor
  public static class SomeClass {
    private String someVariable;
  }

  @Bean
  ApplicationRunner runner(PulsarTemplate<SomeClass> pulsarTemplate) {
    pulsarTemplate.setSchema(JSONSchema.of(SomeClass.class));
    SomeClass someClass = SomeClass.builder().someVariable("Hello World!!!").build();
    return (args) -> pulsarTemplate.send("hello-pulsar", someClass);
  }

  @PulsarListener(
      subscriptionName = "hello-pulsar-subscription",
      topics = "hello-pulsar",
      schemaType = SchemaType.AUTO_CONSUME)
  void listen(SomeClass message) {
    System.out.println("Message Received: " + message);
  }
}

When I run this I simply get two errors,

java.lang.IllegalAccessException: class org.apache.pulsar.common.util.netty.DnsResolverUtil cannot access class sun.net.InetAddressCachePolicy (in module java.base) because module java.base does not export sun.net to unnamed module @544ff9ef at java.base/jdk.internal.reflect.Reflection.newIllegalAccessException(Reflection.java:392) ~[na:an]

java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.client.api.Schema.getSchemaInfo()" because the return value of "org.springframework.pulsar.listener.PulsarContainerProperties.getSchema()" is null

When I consume it as string though, everything works.

Any help will be appreciated. Thanks in advance.

sobychacko
  • 5,099
  • 15
  • 26
Keshavram Kuduwa
  • 942
  • 10
  • 40

2 Answers2

3

The 1st error is due to Java17 strong encapsulation of internals by default. To get around this you end up having to add-opens in all the places you are launching. Here is an example that sets it for the bootRun when using the Spring Boot Gradle Plugin to launch the app. This is an annoyance, but not the cause of the final NPE.

The actual cause is that SchemaType.AUTO_CONSUME is not supported in @PulsarListener. Set it to SchemaType.JSON. See here for more details.

onobc
  • 466
  • 1
  • 5
0

You can try using SchemaType.JSON and have your SomeClass Object follow the required conventions for json serialization/deserialization.

You can include the SchemaType in your consumer as below:

@PulsarListener(subscriptionName = "hello-pulsar-subscription", topics = "hello-pulsar", schemaType = SchemaType.JSON)
public void consumeMessage(SomeClass message) {
    // further processing of your message
}

for Complete post on similar topic check the Apache Pulsar and Spring Boot post.

Sangam Belose
  • 4,262
  • 8
  • 26
  • 48