2

Please find the use case we need to implement.

First, we need to invoke a Kafka producer a message as a rest service, they will process and give back the response in another topic.

For us, It is a request-reply topic we need to reply back for the same request the response, using replykafka template is working fine, but we can set co-relation id in the header.

As a topic message metadata there are sending in attributes, is there any way to map the co-relation id with request topic message and reply topic message.

Explain to you better.

One microservice expects the payload as given below with correlationId in payload.

{
  "operationDate": "2020-09-16T11:58:25",
  "correlationId": "-5544538377183901824042719876882142227",
  "birthDate": "2013-12-12",
  "firstNameEn": "boby",
  "firstNameAr": "الشيخ",
}

The microservice will process the payload and will give a response in another topic as.

{
  "correlationId": -5544538377183901824042719876882142227,
  "consumerId": null,
  "userid": 123456,
  "statusCode": "SUCCESS",
  "errors": null
}

Now as this we need to implement using spring ReplyingKafkaTemplate.

As ReplyingKafkaTemplate will work with correlationId in the header only

shuaib
  • 71
  • 6
  • Sorry, not clear what is an idea to carry a correlation id over in the request `ProducerRecord`? You can't use headers for that? Why? – Artem Bilan Sep 03 '20 at 15:51
  • See my answer; use a `correlationIdStrategy` `Function`. – Gary Russell Sep 03 '20 at 16:18
  • @ArtemBilan You are right, we should or can use in the header, but the microservice we are calling is only allowing in payload rather than the header. That is the problem we need to hook the same co-relation id in the payload also . – shuaib Sep 16 '20 at 12:41
  • @GaryRussell we need a way so that we could append in payload and same they will process the message in topic and put in another kafka topic with same co-relation id – shuaib Sep 16 '20 at 12:42
  • Much clearer. See the edit to my answer. – Gary Russell Sep 16 '20 at 13:01

2 Answers2

1

Assuming you mean you want to include the topic(s) in the correlation id, see

/**
 * Set a function to be called to establish a unique correlation key for each request
 * record.
 * @param correlationStrategy the function.
 * @since 2.3
 */
public void setCorrelationIdStrategy(Function<ProducerRecord<K, V>, CorrelationKey> correlationStrategy) {

You can create your own correlation id, based on the ProducerRecord (which has the topic()).

You just need to make sure it is unique. If you manually set the KafkaHeaders.REPLY_TOPIC, it will be visible to the strategy.

EDIT

With the correlation id in the payload, use setCorrelationIdStrategy to extract the correlationId from the payload and add a RecordInterceptor to do the same on the reply side.

Gary Russell
  • 166,535
  • 14
  • 146
  • 179
  • 1
    I thought his point that they can't rely on this `headers.add(new RecordHeader(this.correlationHeaderName, correlationId.getCorrelationId()));`. Sounds like the other part can't read Kafka headers... – Artem Bilan Sep 03 '20 at 16:44
  • I agree; the question is not clear; let's wait for clarification :) – Gary Russell Sep 03 '20 at 16:52
  • Hi, the question was using spring replaykafka default the Kafka template will add correlationId in header of the message and if the same correlationId came in replaycontainer, it will map for the same request. – shuaib Sep 16 '20 at 12:39
0

Thanks for the hint.

I have done as overriding the payload with Kafka header correlationId.

@Override
    protected ListenableFuture<SendResult> doSend(ProducerRecord producerRecord) {
        if(producerRecord.value()!=null){
           // i have appeneded the header correlationId in th payload
        }
        return super.doSend(producerRecord);
    }

And in Replay onMessage ,i have populated the response payload correlationId to the header.

@Override
    public void onMessage(List<ConsumerRecord<K, R>> data) {
        data.forEach(
            krConsumerRecord -> //update each record header
        );
        super.onMessage(data);
    }

In this way was successfully integrated request-response semantics with correlationId in the request and response payload.

shuaib
  • 71
  • 6