0

I am trying to use Supplier/Consumer to produce and consume messages from Kinesis data stream. Is there a way to add partition key dynamically?

    private BlockingQueue<Message> messages = new LinkedBlockingQueue<>();

    @Bean
    public Supplier<Message<String>> produceMessages() {
        return () ->  this.messages.poll();
    }

    @Override
    public void produce(main.Test request, StreamObserver<Test> response) {
        Message input = MessageBuilder.withPayload(request.getMessage())
                .setHeader("partitionKey", "los").build();
        this.messages.offer(input);
        response.onCompleted();
    }

application.properties

spring.cloud.stream.bindings.produceMessages-out-0.producer.partitionKeyExpression=headers['partitionKey']
Kanika
  • 3
  • 4

1 Answers1

0

We don't know what is channel1, but according to Spring Cloud Stream docs it has to be like this:

spring.cloud.stream.bindings.produceMessages-out-0.producer.partitionKeyExpression='some-key'

https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/spring-cloud-stream.html#_producer_properties

You can find sample here: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/kinesis-samples/kinesis-produce-consume

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • Yes, this shall work when partitionKey can be same for all messages out of produceMessages. But what if the partition key has to be set based on some parameter in the message and it will not be the same for all messages. – Kanika Jan 24 '23 at 15:47
  • Since that is an expression it can get access to `headers` and `payload` properties of the `Message` produced by your `Supplier`. – Artem Bilan Jan 24 '23 at 16:35
  • Hey sorry, let me rephrase my question. I have updated the code above. So here based on the expression, the partition key has to be "los" under which the data has to be published. But irrespective of the partition key given, the data is always published under partition key "0" for me. – Kanika Jan 24 '23 at 17:08
  • That's not how it works in Kinesis. Please, learn what is shard and how to it is chosen by the Kinesis producer: https://docs.aws.amazon.com/streams/latest/dev/key-concepts.html#shard – Artem Bilan Jan 24 '23 at 17:18
  • Okay, in this case, when same partition key is passed, the MD5 hash would be the same and those records would go into the same shard. Please let me know if my understanding is wrong. Also I have a query whether this is related to https://stackoverflow.com/questions/49253112/how-to-set-partitionkey-when-using-spring-cloud-stream-kinesis – Kanika Jan 24 '23 at 17:29
  • Correct. For the same key the hash is always the same therefore a shard is always going to be the same. – Artem Bilan Jan 24 '23 at 17:39
  • Got it. So the partition key should differ based on what I pass in the headers, right? When I view in Kinesis Data Viewer, all of my messages are visible under partition key "0" under the same shard even though I gave different partition keys. – Kanika Jan 24 '23 at 17:50
  • Well, I cannot say at the moment why is that, but would be great if you can debug it in the `KinesisMessageHandler.buildPutRecordRequest()` to see what `this.partitionKeyExpression` is producing for you. – Artem Bilan Jan 24 '23 at 18:15
  • Sure, will do that. Thanks! – Kanika Jan 25 '23 at 01:24
  • this.partitionKeyExpression returns a function expression and the getValue method of it always returns 0. I am not sure whether the function expression construction is happening aptly for my usecase. – Kanika Jan 25 '23 at 08:52