1

Kafka is sending messages to only one partition on a topic. I have used KeyedSerializationSchema in the FlinkKafkaProducer09 and passing an attribute from the event stream which is to be used to perform the hash partition selection (As I want all events from a type of attribute to enter into a specific partition every time). When I post messages belonging to 50 different attribute types I see all of them going into the same partition. I was expecting some form of load balancing based on attribute based partition selection by kafka.

DataStream<JsonObject> myEvents = ....;
FlinkKafkaProducer09<JsonObject> myProducer = new FlinkKafkaProducer09<>(myTopic, new myImplementationOfKeyedSerializationSchema("attributeNameToUseForPartition"), kafkaproperties);
myEvents.addSink(myProducer).setParallelism(1).name("mySink");
....

class myImplementationOfKeyedSerializationSchema implements KeyedSerializationSchema<JsonObject> 
{
  public myImplementationOfKeyedSerializationSchema (String messageKey) {
    this.messageKey = messageKey;
  }
  
  @Override
  public byte[] serializeKey(JsonObject event) {
    return event.get(messageKey).toString().getBytes();
  }

  @Override
  public byte[] serializeValue(JsonObject event) {
    return event.toString().getBytes();
  }

  @Override
  public String getTargetTopic(JsonObject event) {
    return null;
  }
}

I am unable to figure out why partition selection is not happening. Flink Version : 1.1.4

user4923462
  • 97
  • 3
  • 16

2 Answers2

0

I haven't dug into the code to see how Flink configures the Kafka producer, for the case where you don't provide an explicit partitioner. But it's often problematic to depend on default Kafka behavior when using Flink, as Flink overrides a lot of that.

At least for the newer KafkaSink, you can specify the partitioner used by the KafkaRecordSerializationSchema, which you can set via the KafkaRecordSerializationSchemaBuilder.setPartitioner() method.

kkrugler
  • 8,145
  • 6
  • 24
  • 18
0

After some more digging into FlinkKafkaProducer09 I came across a small comment in the argument options popup (on IDE) for the object creation. An optional 4th argument can be passed customPartitioner. I skipped this originally as I thought it was not needed as I was going for default kafka partitioning. But what's in the parenthesis gave me some hint of how this might matter in my case here.

customPartitioner A serializable partitioner for assigning messages to Kafka partitions (when passing null, we'll use Kafka's partitioner)

So, passing a simple null in the 4th argument made it work!!

FlinkKafkaProducer09<JsonObject> myProducer = new FlinkKafkaProducer09<>(myTopic, new myImplementationOfKeyedSerializationSchema("attributeNameToUseForPartition"), kafkaproperties, null);

Looks like this is needed for Flink to give control to Kafka to manage the partitions based on the keyed serialization scheme. An now I have partition stickiness based on my custom key.

user4923462
  • 97
  • 3
  • 16