0

I am subscribing to three different topic through GCP pub/sub API:

Below is the configuration of three subscriber -

Subscriber one UserSubscriberConfig.java :

    @Bean
    public MessageChannel userPubSubInputChannel() {
        return new DirectChannel();
    }
     
    @Bean
    public PubSubInboundChannelAdapter userMessageChannelAdapter(@Qualifier("userPubSubInputChannel") MessageChannel userPubSubInputChannel,
            @Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
     
        pubSubTemplate.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper()));
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "subscription_name");
        adapter.setOutputChannel(userPubSubInputChannel);
        adapter.setPayloadType(UserChangeEvent.class);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }
    
    private ObjectMapper getObjectMapper() {
          ObjectMapper objectMapper = new ObjectMapper();
          objectMapper.registerModule(new JavaTimeModule());
          return objectMapper;
      }
        

Subscriber 2 ProductSubscriberConfig.java

    @Bean
    public MessageChannel productPubSubInputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public PubSubInboundChannelAdapter productMessageChannelAdapter(@Qualifier("productPubSubInputChannel") MessageChannel productPubSubInputChannel,
            @Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
     
        pubSubTemplate.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper()));
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "subscriber_name");
        adapter.setOutputChannel(productPubSubInputChannel);
        adapter.setPayloadType(ProductChangeEvent.class);
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }
    
    private ObjectMapper getObjectMapper() {
          ObjectMapper objectMapper = new ObjectMapper();
          objectMapper.registerModule(new JavaTimeModule());
          return objectMapper;
      }    

Subscriber 3 AuditSubscriberConfig.java

       @Bean
    public MessageChannel auditPubSubInputChannel() {
        return new DirectChannel();
    }
    
    @Bean
    public PubSubInboundChannelAdapter auditMessageChannelAdapter(@Qualifier("auditPubSubInputChannel") MessageChannel auditPubSubInputChannel,
            @Qualifier("pubSubTemplate") PubSubTemplate pubSubTemplate) {
     
        PubSubInboundChannelAdapter adapter = new PubSubInboundChannelAdapter(pubSubTemplate, "subscriber_name");
        adapter.setOutputChannel(auditPubSubInputChannel);
        adapter.setPayloadType(String.class); //need changes
        adapter.setAckMode(AckMode.MANUAL);
        return adapter;
    }

And I have three different Consumer class, below one is for Audit event(all three looks similar, only the inputChannel differs):

      @ServiceActivator(inputChannel = "auditPubSubInputChannel")
      public void messageReceiver(final Message<String> message) {
        try{
           String msg = message.getPayload();
           log.info(msg);
           ack(message);
          } catch{
           nack(message);
           }
         }

As user_topic and product_topic publishing Json response, these two subscriber working perfectly fine as I have a converter for these two subscription pubSubTemplate.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper())); but as audit_topic is publishing String response, it's throwing the below error:

    gcp-pubsub-subscriber3 trace: [31m[WARN ][0;39m MessageDispatcher$AckHandler - [32mMessageReceiver failed to process ack ID: PjA-RVNEUAYWLF1GSFE3GQhoUQ5PXiM_NSAoRRIDBGd5PhpGHg4HXFx1B1EMEXd-NHJqT1cPkw9bSImhBWen4_L43Iv2docZoZiQ9WhJLLD5-IzZFQV5AEsjhdakjJHKJHJHjshadkjHHJHJ, the message will be nacked.
        [0;39mcom.google.cloud.spring.pubsub.support.converter.PubSubMessageConversionException: JSON deserialization of an object of type java.lang.String failed.; nested exception is com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'audit_number_1001': was expecting (JSON String, Number, Array, Object or token 'null', 'true' or 'false')
      at [Source: (byte[])"audit_number_1001"; line: 1, column: 14]
```at com.google.cloud.spring.pubsub.support.converter.JacksonPubSubMessageConverter.fromPubSubMessage(JacksonPubSubMessageConverter.java:67)
    at com.google.cloud.spring.pubsub.core.subscriber.PubSubSubscriberTemplate.lambda$subscribeAndConvert$1(PubSubSubscriberTemplate.java:183)
    at com.google.cloud.pubsub.v1.MessageDispatcher$4.run(MessageDispatcher.java:396)
    at org.springframework.cloud.sleuth.instrument.async.TraceRunnable.run(TraceRunnable.java:63)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)

Can someone please advice how I can configure three different PubSubInboundChannelAdapter to receive Json and String response? UserSubscriberConfig.java will receive Json response UserSubscriberConfig.java will receive Json reponse AuditSubscriberConfig.java will receive String response Looking for a positive update. Thank you

Tech_sharma
  • 117
  • 2
  • 11

1 Answers1

1

The problem is that you are publishing a message as a simple String but you configured you subscriber to read the message as JSON:

pubSubTemplate.setMessageConverter(new JacksonPubSubMessageConverter(getObjectMapper()));

Please, either publish your messages as JSON format or remove the aforementioned line in your configuration: by default the Spring GCP PubSub library provides all the necessary mechanisms to deal with simple text String messages.

jccampanero
  • 50,989
  • 3
  • 20
  • 49
  • Thank you for your response. I tried after removing the above mentioned line but its still throwing same error. – Tech_sharma Dec 26 '22 at 07:15
  • I found the root cause of the issue. As I have three subscriber configuration, two is receiving ```Json``` response and other one is receiving ```String``` response. The one which is receiving String response is getting failed. Please have a look into my updated question – Tech_sharma Dec 26 '22 at 11:53
  • Sorry for the late reply @Tech_sharma. Your use has to do with the fact you are using the same `PubSubTemplate` for the three subscriptions, and you configured the associated `MessageConverter` as the Jackson JSON based one. I think you need to setup at least two `PubSubTemplate`s, one configured by deal with JSON, and another to handle simple `String` text messages. Please, consider read [this](https://stackoverflow.com/questions/61495154/how-to-subscribe-to-multiple-google-pubsub-projects-in-spring-gcp) related SO question, I think it could be of help. – jccampanero Jan 02 '23 at 19:52