0

Requirement :- We need to consolidate all the messages having same orderid and perform subsequent operation for the consolidated Message.

Explanation :- Below snippet of code tries to capture all order messages received from a particular tenant and tries to consolidate to a single order message after waiting for a specific period of time It does the following stuff

  1. Repartition message based on OrderId. So each order message will be having tenantId and groupId as its key
  2. Perform a groupby key operation followed by windowed operation for 2 minutes
  3. Reduce operation is performed once windowing is completed.
  4. Ktable is converted again to stream back and then its output is send to another kafka topic

Expected Output :- If there are 5 messages having same order id being sent with in window period. It was expected that the final kafka topic should have only one message and it would be the last reduce operation message.

Actual Output :- All the 5 messages are seen indicating windowing is not happening before invoking reduce operation. All the messages seen in kafka have proper reduce operation being done as each and every message is received.

Queries :- In kafka stream library version 0.11.0.0 reduce function used to accept timewindow as its argument. I see that this is deprecated in kafka stream version 1.0.0. Windowing which is done in the below piece of code, is it correct ? Is windowing supported in newer version of kafka stream library 1.0.0 ? If so, then is there something can be improved in below snippet of code ?

        String orderMsgTopic = "sampleordertopic";

        JsonSerializer<OrderMsg> orderMsgJSONSerialiser = new JsonSerializer<>();
        JsonDeserializer<OrderMsg> orderMsgJSONDeSerialiser = new JsonDeserializer<>(OrderMsg.class);

        Serde<OrderMsg> orderMsgSerde = Serdes.serdeFrom(orderMsgJSONSerialiser,orderMsgJSONDeSerialiser);



        KStream<String, OrderMsg> orderMsgStream = this.builder.stream(orderMsgTopic, Consumed.with(Serdes.ByteArray(), orderMsgSerde))
                                                                .map(new KeyValueMapper<byte[], OrderMsg, KeyValue<? extends String, ? extends OrderMsg>>() {
                                                                    @Override
                                                                    public KeyValue<? extends String, ? extends OrderMsg> apply(byte[] byteArr, OrderMsg value) {
                                                                        TenantIdMessageTypeDeserializer deserializer = new TenantIdMessageTypeDeserializer();
                                                                        TenantIdMessageType tenantIdMessageType = deserializer.deserialize(orderMsgTopic, byteArr);
                                                                        String newTenantOrderKey = null;
                                                                        if ((tenantIdMessageType != null) && (tenantIdMessageType.getMessageType() == 1)) {
                                                                            Long tenantId = tenantIdMessageType.getTenantId();
                                                                            newTenantOrderKey = tenantId.toString() + value.getOrderKey();
                                                                        } else {
                                                                            newTenantOrderKey = value.getOrderKey();
                                                                        }
                                                                        return new KeyValue<String, OrderMsg>(newTenantOrderKey, value);
                                                                    }
                                                                });



        final KTable<Windowed<String>, OrderMsg> orderGrouping = orderMsgStream.groupByKey(Serialized.with(Serdes.String(), orderMsgSerde))
                                                                                .windowedBy(TimeWindows.of(windowTime).advanceBy(windowTime))
                                                                                .reduce(new OrderMsgReducer());


        orderGrouping.toStream().map(new KeyValueMapper<Windowed<String>, OrderMsg, KeyValue<String, OrderMsg>>() {
                                                                    @Override
                                                                    public KeyValue<String, OrderMsg> apply(Windowed<String> key, OrderMsg value) {
                                                                        return new KeyValue<String, OrderMsg>(key.key(), value);
                                                                    }
                                                                }).to("newone11", Produced.with(Serdes.String(), orderMsgSerde));
  • Possible duplicate of [How to send final kafka-streams aggregation result of a time windowed KTable?](https://stackoverflow.com/questions/38935904/how-to-send-final-kafka-streams-aggregation-result-of-a-time-windowed-ktable) – Matthias J. Sax Feb 21 '18 at 22:36
  • Kafka Streams windowing just works fine. But your expatiation is incorrect. In Streams API, there is not such thing as a closing window -- this allows to handle late arriving data. Check out the docs and blog post: https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html and https://www.confluent.io/blog/watermarks-tables-event-time-dataflow-model/ – Matthias J. Sax Feb 21 '18 at 22:38
  • 1
    I realised that I had set StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 and also set the default commit interval of 1000ms. Changing this value helps me to some extent get the windowing working – Nagabhushan Sheshagiri Rao Feb 22 '18 at 07:21
  • Perhaps update your question with this info, or create a new answer and mark it as accepted. – miguno Feb 22 '18 at 09:21

1 Answers1

0

I realised that I had set StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG to 0 and also set the default commit interval of 1000ms. Changing this value helps me to some extent get the windowing working