Kafka is sending messages to only one partition on a topic. I have used KeyedSerializationSchema in the FlinkKafkaProducer09 and passing an attribute from the event stream which is to be used to perform the hash partition selection (As I want all events from a type of attribute to enter into a specific partition every time). When I post messages belonging to 50 different attribute types I see all of them going into the same partition. I was expecting some form of load balancing based on attribute based partition selection by kafka.
DataStream<JsonObject> myEvents = ....;
FlinkKafkaProducer09<JsonObject> myProducer = new FlinkKafkaProducer09<>(myTopic, new myImplementationOfKeyedSerializationSchema("attributeNameToUseForPartition"), kafkaproperties);
myEvents.addSink(myProducer).setParallelism(1).name("mySink");
....
class myImplementationOfKeyedSerializationSchema implements KeyedSerializationSchema<JsonObject>
{
public myImplementationOfKeyedSerializationSchema (String messageKey) {
this.messageKey = messageKey;
}
@Override
public byte[] serializeKey(JsonObject event) {
return event.get(messageKey).toString().getBytes();
}
@Override
public byte[] serializeValue(JsonObject event) {
return event.toString().getBytes();
}
@Override
public String getTargetTopic(JsonObject event) {
return null;
}
}
I am unable to figure out why partition selection is not happening. Flink Version : 1.1.4