I am subscribing to three different topic through GCP pub/sub API:
Below is the configuration of three subscriber -
Subscriber one UserSubscriberConfig.java
:
@Bean
public MessageChannel userPubSubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter userMessageChannelAdapter(@Qualifier("userPubSubInputChannel") MessageChannel userPubSubInputChannel,
@Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
pubSubTemplate.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper()));
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "subscription_name");
adapter.setOutputChannel(userPubSubInputChannel);
adapter.setPayloadType(UserChangeEvent.class);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
private ObjectMapper getObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
}
Subscriber 2 ProductSubscriberConfig.java
@Bean
public MessageChannel productPubSubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter productMessageChannelAdapter(@Qualifier("productPubSubInputChannel") MessageChannel productPubSubInputChannel,
@Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
pubSubTemplate.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper()));
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "subscriber_name");
adapter.setOutputChannel(productPubSubInputChannel);
adapter.setPayloadType(ProductChangeEvent.class);
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
private ObjectMapper getObjectMapper() {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.registerModule(new JavaTimeModule());
return objectMapper;
}
Subscriber 3 AuditSubscriberConfig.java
@Bean
public MessageChannel auditPubSubInputChannel() {
return new DirectChannel();
}
@Bean
public PubSubInboundChannelAdapter auditMessageChannelAdapter(@Qualifier("auditPubSubInputChannel") MessageChannel auditPubSubInputChannel,
@Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "subscriber_name");
adapter.setOutputChannel(auditPubSubInputChannel);
adapter.setPayloadType(String.class); //need changes
adapter.setAckMode(AckMode.MANUAL);
return adapter;
}
And I have three different Consumer class, below one is for Audit event(all three looks similar, only the inputChannel
differs):
@ServiceActivator(inputChannel = "auditPubSubInputChannel")
public void messageReceiver(final Message<String> message) {
try{
String msg = message.getPayload();
log.info(msg);
ack(message);
} catch{
nack(message);
}
}
As user_topic
and product_topic
publishing Json
response, these two subscriber working perfectly fine as I have a converter for these two subscription pubSubTemplate.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper()));
but as audit_topic
is publishing String
response, it's throwing the below error:
gcp-pubsub-subscriber3 trace: [31m[WARN ][0;39m MessageDispatcher$AckHandler - [32mMessageReceiver failed to process ack ID: PjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIDBGd5PhpGHg4HXFx1B1EMEXd-NHJqT1cPkw9bSImhBWen4_L43Iv2docZoZiQ9WhJLLD5-IzZFQV5AEsjhdakjJHKJHJHjshadkjHHJHJ, the message will be nacked.
[0;39mcom.google.cloud.spring.pubsub.support.converter.PubSubMessageConversionException: JSON deserialization of an object of type java.lang.String failed.; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'audit_number_1001': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
at [Source: (byte[])"audit_number_1001"; line: 1, column: 14]
```at com.google.cloud.spring.pubsub.support.converter.JacksonPubSubMessageConverter.fromPubSubMessage(JacksonPubSubMessageConverter.java:67)
at com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:183)
at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:396)
at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:63)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Can someone please advice how I can configure three different PubSubInboundChannelAdapter
to receive Json
and String
response?
UserSubscriberConfig.java
will receive Json
response
UserSubscriberConfig.java
will receive Json
reponse
AuditSubscriberConfig.java
will receive String
response
Looking for a positive update. Thank you