Is it possible to read Kafka message header which is set in KafkaProducer application, in Flink Stateful function written in Python
KafkaProducer looks like this
Set<Header> headers = Collections.singleton(new RecordHeader("is_mock_event", "true".getBytes(StandardCharsets.UTF_8)));
ProducerRecord<String, DomainCoreCompositeEvent> producerRecordWithHeader =
new ProducerRecord<>(producerRecord.topic(), null, producerRecord.key(), producerRecord.value(), headers);
Module.yaml configuration is
kind: io.statefun.kafka.v1/ingress
spec:
id: com/my-ingress-summary
address: kafka-1g-us-east-1.test.aws.my.green:9093
consumerGroupId: my_consumer_group
topics:
- topic: my_topic
valueType: type.googleapis.com/my-event.input.com.abc.xyz.MyDomainEvent
targets:
- com/filter-event-function
Stateful Function is
def filter-event-function(context: Context, message: Message):