19

In Apache Flink I have a stream of tuples. Let's assume a really simple Tuple1<String>. The tuple can have an arbitrary value in it's value field (e.g. 'P1', 'P2', etc.). The set of possible values is finite but I don't know the full set beforehand (so there could be a 'P362'). I want to write that tuple to a certain output location depending on the value inside of the tuple. So e.g. I would like to have the following file structure:

  • /output/P1
  • /output/P2

In the documentation I only found possibilities to write to locations that I know beforehand (e.g. stream.writeCsv("/output/somewhere")), but no way of letting the contents of the data decide where the data is actually ending up.

I read about output splitting in the documentation but this doesn't seem to provide a way to redirect the output to different destinations the way I would like to have it (or I just don't understand how this would work).

Can this be done with the Flink API, if so, how? If not, is there maybe a third party library that can do it or would I have to build such a thing on my own?

Jean-François Fabre
  • 137,073
  • 23
  • 153
  • 219
Jan Thomä
  • 13,296
  • 6
  • 55
  • 83

1 Answers1

7

You can implement a custom sink. Inherit from one of both:

  • org.apache.flink.streaming.api.functions.sink.SinkFunction
  • org.apache.flink.streaming.api.functions.sink.RichSinkFunction

In your program use:

stream.addSink(SinkFunction<T> sinkFunction);

instead of stream.writeCsv("/output/somewhere").

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • 5
    Thank you! I checked the implementation of `FileSinkFunction` and came up with something similar on my own. I added the implementation to my question for reference. – Jan Thomä Oct 29 '15 at 13:50