Requirement :- We need to consolidate all the messages having same orderid and perform subsequent operation for the consolidated Message.
Explanation :- Below snippet of code tries to capture all order messages received from a particular tenant and tries to consolidate to a single order message after waiting for a specific period of time It does the following stuff
- Repartition message based on OrderId. So each order message will be having tenantId and groupId as its key
- Perform a groupby key operation followed by windowed operation for 2 minutes
- Reduce operation is performed once windowing is completed.
- Ktable is converted again to stream back and then its output is send to another kafka topic
Expected Output :- If there are 5 messages having same order id being sent with in window period. It was expected that the final kafka topic should have only one message and it would be the last reduce operation message.
Actual Output :- All the 5 messages are seen indicating windowing is not happening before invoking reduce operation. All the messages seen in kafka have proper reduce operation being done as each and every message is received.
Queries :- In kafka stream library version 0.11.0.0 reduce function used to accept timewindow as its argument. I see that this is deprecated in kafka stream version 1.0.0. Windowing which is done in the below piece of code, is it correct ? Is windowing supported in newer version of kafka stream library 1.0.0 ? If so, then is there something can be improved in below snippet of code ?
String orderMsgTopic = "sampleordertopic";
JsonSerializer<OrderMsg> orderMsgJSONSerialiser = new JsonSerializer<>();
JsonDeserializer<OrderMsg> orderMsgJSONDeSerialiser = new JsonDeserializer<>(OrderMsg.class);
Serde<OrderMsg> orderMsgSerde = Serdes.serdeFrom(orderMsgJSONSerialiser,orderMsgJSONDeSerialiser);
KStream<String, OrderMsg> orderMsgStream = this.builder.stream(orderMsgTopic, Consumed.with(Serdes.ByteArray(), orderMsgSerde))
.map(new KeyValueMapper<byte[], OrderMsg, KeyValue<? extends String, ? extends OrderMsg>>() {
@Override
public KeyValue<? extends String, ? extends OrderMsg> apply(byte[] byteArr, OrderMsg value) {
TenantIdMessageTypeDeserializer deserializer = new TenantIdMessageTypeDeserializer();
TenantIdMessageType tenantIdMessageType = deserializer.deserialize(orderMsgTopic, byteArr);
String newTenantOrderKey = null;
if ((tenantIdMessageType != null) && (tenantIdMessageType.getMessageType() == 1)) {
Long tenantId = tenantIdMessageType.getTenantId();
newTenantOrderKey = tenantId.toString() + value.getOrderKey();
} else {
newTenantOrderKey = value.getOrderKey();
}
return new KeyValue<String, OrderMsg>(newTenantOrderKey, value);
}
});
final KTable<Windowed<String>, OrderMsg> orderGrouping = orderMsgStream.groupByKey(Serialized.with(Serdes.String(), orderMsgSerde))
.windowedBy(TimeWindows.of(windowTime).advanceBy(windowTime))
.reduce(new OrderMsgReducer());
orderGrouping.toStream().map(new KeyValueMapper<Windowed<String>, OrderMsg, KeyValue<String, OrderMsg>>() {
@Override
public KeyValue<String, OrderMsg> apply(Windowed<String> key, OrderMsg value) {
return new KeyValue<String, OrderMsg>(key.key(), value);
}
}).to("newone11", Produced.with(Serdes.String(), orderMsgSerde));