0

There are many old topics about rate limiting / throttling Kafka consumer

But none of them can be used in case 1.15:

  • KafkaFetcher does not expose emitRecord
  • FlinkKafkaConsumer is deprecated
    • KafkaSource is the preferred approach
  • KafkaSource explicitly creates a KafkaRecordEmitter inside it's createReader

So, my question is that is there anyway to combine ThrottledIterator with KafkaSource?

Peter Csala
  • 17,736
  • 16
  • 35
  • 75

1 Answers1

0

You shouldn't applying rate limiting in the main processing thread, as this can/will block checkpointing. What you should do instead is apply rate limiting in a deserialization schema.

You can set up a rate limiter in the deserialization schema's open method and acquire it in the deserialize method. (But I would use a Guava RateLimiter rather than a throttled iterator for this.)

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • 1
    So are you suggesting to use the ratelimiter in our derived class of `AbstractDeserializationSchema`? – Peter Csala May 17 '22 at 16:00
  • [RateLimiter is marked unstable with @Beta](https://github.com/google/guava/issues/5959). As I can see [resilience4j](https://github.com/resilience4j/resilience4j#ratelimiter) is recommended approach instead. – Peter Csala May 19 '22 at 12:07