I am trying to implement a simple messaging queue using Spring Boot and Apache Pulsar. The system works fine when working with String datatype but not for a Custom Object. Below are the object I'm using as well as the producer and consumer:
Custom Object:
package com.example.springpulsar;
public class User {
private String email;
private String firstName;
// constructors, getters and setters
}
Producer:
package com.example.springpulsar;
// imports
@Component
public class PulsarProducer {
@Autowired
private PulsarTemplate<User> template;
private static final String USER_TOPIC = "user-topic";
public void sendMessageToPulsarTopic(User user) throws PulsarClientException {
template.send(USER_TOPIC, user);
}
}
Consumer:
package com.example.springpulsar;
// imports
@Service
public class PulsarConsumer {
private static final String USER_TOPIC = "user-topic";
private final Logger LOGGER = LoggerFactory.getLogger(PulsarConsumer.class);
@PulsarListener(
subscriptionName = "user-topic-subscription",
topics = USER_TOPIC,
subscriptionType = SubscriptionType.Shared,
schemaType = SchemaType.JSON
)
public void userTopicListener(User user) {
LOGGER.info("Received user object with email: {}", user.getEmail());
}
}
Gradle dependency: org.springframework.pulsar:spring-pulsar-spring-boot-starter:0.2.0
Application properties:
spring:
pulsar:
defaults:
type-mappings:
- message-type: com.example.springpulsar.User
schema-info:
schema-type: JSON
I read somewhere that adding the schemaType in the Listener annotation should work, but it hasn't. Debug logs:
2023-06-20T14:41:24.050+05:30 DEBUG 32777 --- [nio-8085-exec-5] o.a.p.c.impl.BatchMessageContainerImpl : [user-topic] [null] add message to batch, num messages in batch so far 0
2023-06-20T14:41:24.053+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl : [user-topic] [standalone-1-21] Sending message cnx org.apache.pulsar.client.impl.ClientCnx@6f18e7a7, sequenceId 1
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder : [localhost/127.0.0.1:6650] Received cmd SEND_RECEIPT
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Got receipt for producer: 0 -- msg: 1 -- id: 22:42
2023-06-20T14:41:24.056+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ProducerImpl : [user-topic] [standalone-1-21] Received ack for msg 1
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.p.common.protocol.PulsarDecoder : [localhost/127.0.0.1:6650] Received cmd MESSAGE
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.apache.pulsar.client.impl.ClientCnx : [id: 0xbf5c5f2c, L:/127.0.0.1:48676 - R:localhost/127.0.0.1:6650] Received a message from the server: org.apache.pulsar.common.api.proto.CommandMessage@697f429b
2023-06-20T14:41:24.057+05:30 DEBUG 32777 --- [r-client-io-1-1] o.a.pulsar.client.impl.ConsumerImpl : [user-topic][user-topic-subscription] Received message: 22/42
I do see that the ConsumerImpl
receives the message, but the listener method is never called.
Any help is appreciated. Thank you!