1

I am working on a project to read from our existing ElasticSearch instance and produce messages in Pulsar. If I do this in a highly multithreaded way without any explicit synchronization, I get many occurances of the following log line:

Message with sequence id X might be a duplicate but cannot be determined at this time.

That is produced from this line of code in the Pulsar Java client: https://github.com/apache/pulsar/blob/a4c3034f52f857ae0f4daf5d366ea9e578133bc2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L653

When I add a synchronized block to my method, synchronizing on the pulsar template, the error disappears, but my publish rate drops substantially.

Here is the current working implementation of my method that sends Protobuf messages to Pulsar:

    public <T extends GeneratedMessageV3> CompletableFuture<MessageId> persist(T o) {
        var descriptor = o.getDescriptorForType();
        PulsarPersistTopicSettings settings = pulsarPersistConfig.getSettings(descriptor);
        MessageBuilder<T> messageBuilder = Optional.ofNullable(pulsarPersistConfig.getMessageBuilder(descriptor))
                .orElse(DefaultMessageBuilder.DEFAULT_MESSAGE_BUILDER);
        Optional<ProducerBuilderCustomizer<T>> producerBuilderCustomizerOpt =
                Optional.ofNullable(pulsarPersistConfig.getProducerBuilder(descriptor));
        PulsarOperations.SendMessageBuilder<T> sendMessageBuilder;

            sendMessageBuilder = pulsarTemplate.newMessage(o)
                    .withSchema(Schema.PROTOBUF_NATIVE(o.getClass()))
                    .withTopic(settings.getTopic());
            producerBuilderCustomizerOpt.ifPresent(sendMessageBuilder::withProducerCustomizer);
            sendMessageBuilder.withMessageCustomizer(mb -> messageBuilder.applyMessageBuilderKeys(o, mb));
        synchronized (pulsarTemplate) {
            try {
                return sendMessageBuilder.sendAsync();
            } catch (PulsarClientException re) {
                throw new PulsarPersistException(re);
            }
        }
    }

The original version of the above method did not have the synchronized(pulsarTemplate) { ... } block. It performed faster, but generated a lot of logs about duplicate messages, which I knew to be incorrect. Adding the synchronized block got rid of the log messages, but slowed down publishing.

What are the best practices for multithreaded access to the PulsarTemplate? Is there a better way to achieve very high throughput message publishing?

Should I look at using the reactive client instead?

EDIT: I've updated the code block to show the minimum synchronization necessary to avoid the log lines, which is just synchronizing during the .sendAsync(...) call.

Jeff
  • 101
  • 1
  • 6
  • This log is at INFO level. Apart from the noise, are you seeing any other negative effect (messages rejected, lost, ...) ? – Christophe Bornet Feb 02 '23 at 17:46
  • I don't know. That is what I'm trying to verify. I get the log line hundreds of thousands of times during a pre-prod scale run. In looking through the logging I am also seeing some cases of the other log line that is a warning where a record is definitely a duplicate (when I know it definitely isn't). Most importantly, this behavior isn't present when running single threaded, so the core of the question is really whether the PulsarTemplate truly is threadsafe, or if I need to treat it as non-threadsafe. – Jeff Feb 02 '23 at 23:00
  • Could you make a test using the Pulsar client directly instead of Spring-Pulsar ? – Christophe Bornet Feb 06 '23 at 09:50

2 Answers2

0

Your usage w/o the synchronized should work. I will look into that though to see if I see anything else going on. In the meantime, it would be great to give the Reactive client a try.

onobc
  • 466
  • 1
  • 5
  • Should work meaning what? That there is no impact to data integrity? That even though I am hitting that log line all of the data is properly sent to Pulsar? I am getting hundreds of thousands of instances of that log message. – Jeff Feb 02 '23 at 17:27
  • Additionally, I sometimes get the warning log line here: https://github.com/apache/pulsar/blob/a4c3034f52f857ae0f4daf5d366ea9e578133bc2/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java#L651 Taken with the fact that if I synchronize the `messageBuilder.sendAsync()` call I am not seeing any of those log lines, seems to indicate to me that there is something going on in the PulsarTemplate that is not truly threadsafe when used in the way that I am. – Jeff Feb 02 '23 at 23:01
0

This issue was initially tracked here, and the final resolution was that it was an issue that has been resolved in Pulsar 2.11.

Please try updating the Pulsar 2.11.

David Kjerrumgaard
  • 1,056
  • 7
  • 10
  • I created the Pulsar issue. As you can see in the comment on my original ticket: https://github.com/apache/pulsar/issues/19457#issuecomment-1423392356 it is reproducable on 2.11, but not on HEAD. As such, the hope is that the next release will contain the fix for this. – Jeff Feb 17 '23 at 19:32