1

I am using spring boot to connect to a Pulsar message broker but I cannot access the message payload no matter what class I use in the call to:

//...
void listen(<T> message) 
//...

I'm using Intellij 2023.1 (Community edition)

The full class code is:

package dan.teachingagency.covernotices;

import com.fasterxml.jackson.databind.ObjectMapper;
import dan.teachingagency.Exception.MapperException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.stereotype.Service;


@Service
@SpringBootApplication
@EnablePulsar
public class CovernoticesApplication {
    public static void main(String[] args) {

        SpringApplication.run(CovernoticesApplication.class, args);
    }

//This was for my debugging purposes.
    private int messageCount = 0;

    @Autowired
    public TeacherRepository teacherRepository;

    ObjectMapper mapper = new ObjectMapper();

    @PulsarListener(
            subscriptionName = "TeacherAdminSub",
            topics = "persistent://school1/admin_topics/userdetails",
            schemaType = SchemaType.JSON,
            ackMode = AckMode.RECORD)

    void listen(Message<String> message) {
//Initially messages weren't arriving. Then they kept getting re-delivered every time
//I started the application until I changed the ackMode as shown
messageCount++;
        System.out.println("["+new java.util.Date(System.currentTimeMillis())+"] Message ["+messageCount+"]received.");

//If I uncomment this code (which has a call to the Spring Boot message implementation, I think
//the call deadlocks because the line of code does not appear to run; nothing is output.

        //System.out.println("["
        //      +new java.util.Date(System.currentTimeMillis())
    //          +"] Message received ["
//              +this.messageCount+"]: ["
                //+message.getPayload()+"].");

//This is the equivalent in the Pulsar API implementation. It also does not run.
        //System.out.println("Message value: ["+message.getValue()+"]");

        /*Map<String,String> messageProperties = message.getProperties();
        String messageType = messageProperties.get("TYPE");
        System.out.println("Message type: ["+messageType+"]");
        if("TEACHER".equals(messageType)) {
            System.out.println("Saving message now. ["+message.getValue()+"]");
            StaffMember teacher = fromJson(message.getValue(), StaffMember.class);
            System.out.println("Teacher=["+teacher+"]");
            teacherRepository.save(teacher);
            System.out.println("Saved message:[" + message.getValue() + "]");
        } else {
            System.out.println("Received rogue message ["+message.getValue()+"]");
        }*/
    }

    /**
     * Convert json to Object
     * @param json String json value
     * @param clazz Instances of the class
     * @param <T> Object Class
     * @return Object class
     */
    private <T > T fromJson(String json, Class < T > clazz) {
        try {
            return mapper.readValue(json, clazz);
        } catch (Exception e) {
            throw new MapperException(e.getMessage());
        }
    }
}

I have tried: Googling for similar problems Inserting debugging statements Swapping the Message API implementation between Spring Boot and Pulsar APIs Turning on DEBUG mode to see if any 'hidden' exceptions are being thrown (none are)

Dan
  • 45
  • 4
  • HI @dan, What version of Spring Pulsar are you using? What version of Java are you using? Can you successfully go through the [Quick Tour](https://docs.spring.io/spring-pulsar/docs/0.2.0/reference/html/#quick-tour) and then, once that is working, modify the listener to instead take a `Message`? Couple of unrelated things I noticed in your app: ``` @Service @SpringBootApplication @EnablePulsar ``` Can all be replaced w/ `@SpringBootApplication`. Also, you could also just use the object mapper provided by Spring Boot (`@Autowire`). – onobc Jun 15 '23 at 14:33
  • Yes. tried with void listen(Message message) and same result. I can use String data = new String(message.getBytes(),StandardCharsets.UTF_8) and the message payload is correctly stored in 'data'; possibly a cumbersome solution. JDK version is jdk-17. Spring boot version is 3.0.7. Apache Pulsar client API is 2.11.0. The Pulsar broker itself is v3.0.0. I have clocked off for the day but will attempt the 'Quck Tour' and see if it works and get back to you. Thanks for the tips on annotations. I'm new at Spring boot (and Pulsar) so probably use too many when I could use fewer. :-) – Dan Jun 15 '23 at 19:32

1 Answers1

0

I checked the quick start code given by @onobc in the comments but discovered this was code I originally started learning with anyway. It worked but its Message parameter in the listener method (void listen...) was just a String. That had worked for me anyway. It was the Message.getValue() that was causing the problem which I have now solved.

In the end, I took the following steps and it started working. I'm not certain but I suspect the problem was caused by some kind of corruption on the pulsar broker which makes it most likely step 1 (below) solved it. I think this because I found some errors in my pulsar broker start-up logs saying there was some kind of bookkeeper corruption - the broker was set to ignore unrecoverable errors in the journals but it noted them in the logs anyway. The behaviour of message delivery was haphazard, sometimes messages would keep getting re-delivered and other times, they would behave as expected at first then suddenly all get re-delivered again. This now doesn't happen. Message arrive, are ACK'd by the API ('under the hood' and are never delivered again as expected):

  1. Deleted the topic and allowed to auto-create on client startup (listener / consumer startup) (using pulsar-admin client in the docker container I'm running my pulsar standalone broker)

  2. Opened the docker terminal for my pulsar standalone broker and used the pulsar admin client to allow schema update:

    bin/pulsar-admin namespaces set-is-allow-auto-update-schema --enable school1/admin_topics

  3. Removed unused reactive pulsar Spring Boot dependencies from my pom.xml - just in case of conflicting classes though I was fairly certain there were none because I checked all my imports) and reloaded the project via the Maven menu (right click on top-level module node->maven->Reload project). Those dependencies were there because I was originally thinking of using reactive pulsar but decided against, for now.

  4. Removed various Pulsar annotations from the class and changed the Message parameter in void listen(){...} to Message. Generally amended the class code as follows:

    package dan.teachingagency.covernotices;

    import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.pulsar.client.api.Message; import org.apache.pulsar.common.schema.SchemaType; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.pulsar.annotation.PulsarListener;

    @SpringBootApplication public class CovernoticesApplication { public static void main(String[] args) { SpringApplication.run(CovernoticesApplication.class, args); } private int messageCount = 0;

     @Autowired
     public TeacherRepository teacherRepository;
    
     @PulsarListener(
             subscriptionName = "TeacherAdminSub",
             topics = "persistent://school1/admin_topics/userdetails",
             schemaType = SchemaType.JSON)
     void listen(Message<StaffMember> message) {
         messageCount++;
         StaffMember staffMember = message.getValue();
         teacherRepository.save(staffMember);
         System.out.println("["+new java.util.Date(System.currentTimeMillis())+"] Received message ["+messageCount+"] with data received: ["+staffMember+"]");
     }
    

    }

Other than corruption on the broker, the only other possibility I can think of is the Pulsar annotations I removed since they are the only other significant change I made to the code. I had previously tried: void listen(Message){...} and it also locked in Message.getValue() so I don't think having it as 'String message' or Message was the problem.

Dan
  • 45
  • 4
  • Thanks for the details dan. Glad it is now working. Not knowing 100% why is, of course, always a bit frustrating. On the `sometimes messages would keep getting re-delivered and other times, they would behave as expected at first then suddenly all get re-delivered again` it is possible that if the listen method was blocking then the message ack would timeout and therefore be redelivered when the ack timeout. I will try to reproduce it on my end as well and will comment back with what I find. – onobc Jun 17 '23 at 03:12
  • I was unable to reproduce this. If you find it happens again, please file an issue on the [Spring Pulsar Github project](https://github.com/spring-projects/spring-pulsar/issues/new) and attach a small sample app that reproduces the error. Thanks – onobc Jun 20 '23 at 03:38
  • I will do. It hasn't recurred and I think I changed too much at once to know what fixed it. – Dan Jun 21 '23 at 08:24
  • I believe the issue was using the wrong schema type for the messages in the `void listen(Message message)` method, and changing it to the proper JSON object type `void listen(Message message)` resolved the issue. This is due to the fact that Pulsar Consumers are schema-aware and can only consume messages that can be serialized into the proper type. – David Kjerrumgaard Jul 05 '23 at 18:24