0

Is it possible to read Kafka message header which is set in KafkaProducer application, in Flink Stateful function written in Python

KafkaProducer looks like this

Set<Header> headers = Collections.singleton(new RecordHeader("is_mock_event", "true".getBytes(StandardCharsets.UTF_8)));
ProducerRecord<String, DomainCoreCompositeEvent> producerRecordWithHeader =
        new ProducerRecord<>(producerRecord.topic(), null, producerRecord.key(), producerRecord.value(), headers);

Module.yaml configuration is

kind: io.statefun.kafka.v1/ingress
spec:
  id: com/my-ingress-summary
  address: kafka-1g-us-east-1.test.aws.my.green:9093
  consumerGroupId: my_consumer_group
  topics:
    - topic: my_topic
      valueType: type.googleapis.com/my-event.input.com.abc.xyz.MyDomainEvent
      targets:
        - com/filter-event-function

Stateful Function is

def filter-event-function(context: Context, message: Message):
techlearner
  • 109
  • 1
  • 4
  • It would be great if this is supported. I see a related ticket: https://issues.apache.org/jira/browse/FLINK-26617 – JavaJack Feb 27 '23 at 20:37

0 Answers0