5

Trying to see the possibility of stream going to two sinks based on conditions.

Requirement is stream have events, all events after transformation need to go to one sink ( assume one kafka topic)

And only error events needs to go to another sink ( assume another kafka topic).

did not see use-case of once transformation is done , additional logic putting in sink. Looking if something similar done

mohit nagpal
  • 103
  • 11

1 Answers1

3

The best way to do this is with side outputs.

private static final OutputTag<String> errors = new OutputTag<>("errors") {};

...

// in your main() method
SingleOutputStreamOperator<T> result = events.process(new ProcessFunction());

result.addSink(sink).name("normal output");
result.getSideOutput(errors).addSink(errorSink).name("error output");

...

// in the process function

if (somethingGoesWrong) {
    ctx.output(errors, "error message");
}

While there are other ways to split a stream with Flink, side outputs are very flexible (e.g., the side outputs can have different types) and perform well.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
  • thanks. some records in input are error records some records are normal records. all records (including error records) need to go to one output and error records need to one separate topic. I guess same side stream concept can be used. error record schema is different while sending to separate topic. i.e total input record 10 ( 8 normal , 2 errors ) all records transformed and go to topic 1 ( all 10 ) 2 errors records transformed separately and go to topic 2 as well – mohit nagpal Apr 14 '22 at 01:22
  • also it there any special issues like perf related to process function ? and regular checkpointing works with this. assuming. – mohit nagpal Apr 14 '22 at 06:01
  • 1
    @David Is there a way to use the same without ProcesFunction? For example: assume the use case is - stream the data from one source to multiple sink based on condition. ie - There is no any intermediate logic that the sideOutput supports now ([here](https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/side_output/)) - No keyBy, Process, CoProcess, etc. How to achieve side output for this specific case? – Jaya Ananthram Apr 30 '22 at 19:55