3

I have a flink job that takes in Kafaka topics and goes through a bunch of operators. I'm wondering what's the best way to deal with exceptions that happen in the middle.

My goal is to have a centralized place to handle those exceptions that may be thrown from different operators and here is my current solution:

Use ProcessFunction and output sideOutput to context in the catch block, assuming there is an exception, and have a separate sink function for the sideOutput at the end where it calls an external service to update the status of another related job

However, my question is that by doing so it seems I still need to call collector.collect() and pass in a null value in order to proceed to following operators and hit last stage where sideOutput will flow into the separate sink function. Is this the right way to do it?

Also I'm not sure what actually happens if I don't call collector.collect() inside a operator, would it hang there and cause memory leak?

Sicong
  • 33
  • 3

1 Answers1

4

It's fine to not call collector.collect(). And you don't need to call collect() with a null value when you use the side output to capture the exception - each operator can have its own side output. Finally, if you have multiple such operators with a side output for exceptions, you can union() the side outputs together before sending that stream to a sink.

If for some reason the downstream operator(s) need to know that there was an exception, then one approach is to output an Either<good result, Exception>, but then each downstream operator would of course need to have code to check what it's receiving.

kkrugler
  • 8,145
  • 6
  • 24
  • 18
  • Thanks! I'm also curious what's the exact behavior when `collector.collect()` is not called inside a operator? And if one operator produces side output, the rest of the operators wouldn't get executed right, assuming I don't `collect()` at the end. Then I need to add sink function after each operator to catch potential side output – Sicong Sep 21 '18 at 00:10
  • If you never call `collect()` inside of your operator, then there's no data flowing through the stream to the next operator, so yes...in that case the downstream operators wouldn't get called, because they wouldn't have any data to process. – kkrugler Sep 22 '18 at 14:31