I am using spring boot to connect to a Pulsar message broker but I cannot access the message payload no matter what class I use in the call to:
//...
void listen(<T> message)
//...
I'm using Intellij 2023.1 (Community edition)
The full class code is:
package dan.teachingagency.covernotices;
import com.fasterxml.jackson.databind.ObjectMapper;
import dan.teachingagency.Exception.MapperException;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.common.schema.SchemaType;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.pulsar.annotation.EnablePulsar;
import org.springframework.pulsar.annotation.PulsarListener;
import org.springframework.pulsar.listener.AckMode;
import org.springframework.stereotype.Service;
@Service
@SpringBootApplication
@EnablePulsar
public class CovernoticesApplication {
public static void main(String[] args) {
SpringApplication.run(CovernoticesApplication.class, args);
}
//This was for my debugging purposes.
private int messageCount = 0;
@Autowired
public TeacherRepository teacherRepository;
ObjectMapper mapper = new ObjectMapper();
@PulsarListener(
subscriptionName = "TeacherAdminSub",
topics = "persistent://school1/admin_topics/userdetails",
schemaType = SchemaType.JSON,
ackMode = AckMode.RECORD)
void listen(Message<String> message) {
//Initially messages weren't arriving. Then they kept getting re-delivered every time
//I started the application until I changed the ackMode as shown
messageCount++;
System.out.println("["+new java.util.Date(System.currentTimeMillis())+"] Message ["+messageCount+"]received.");
//If I uncomment this code (which has a call to the Spring Boot message implementation, I think
//the call deadlocks because the line of code does not appear to run; nothing is output.
//System.out.println("["
// +new java.util.Date(System.currentTimeMillis())
// +"] Message received ["
// +this.messageCount+"]: ["
//+message.getPayload()+"].");
//This is the equivalent in the Pulsar API implementation. It also does not run.
//System.out.println("Message value: ["+message.getValue()+"]");
/*Map<String,String> messageProperties = message.getProperties();
String messageType = messageProperties.get("TYPE");
System.out.println("Message type: ["+messageType+"]");
if("TEACHER".equals(messageType)) {
System.out.println("Saving message now. ["+message.getValue()+"]");
StaffMember teacher = fromJson(message.getValue(), StaffMember.class);
System.out.println("Teacher=["+teacher+"]");
teacherRepository.save(teacher);
System.out.println("Saved message:[" + message.getValue() + "]");
} else {
System.out.println("Received rogue message ["+message.getValue()+"]");
}*/
}
/**
* Convert json to Object
* @param json String json value
* @param clazz Instances of the class
* @param <T> Object Class
* @return Object class
*/
private <T > T fromJson(String json, Class < T > clazz) {
try {
return mapper.readValue(json, clazz);
} catch (Exception e) {
throw new MapperException(e.getMessage());
}
}
}
I have tried: Googling for similar problems Inserting debugging statements Swapping the Message API implementation between Spring Boot and Pulsar APIs Turning on DEBUG mode to see if any 'hidden' exceptions are being thrown (none are)