2

Is it possible in Apache Flink to write to multiple text files depending on a key? For instance, I have some data like this.

key1, foo, bar key2, baz, foo key3, etc, etc

The value of the key isn’t known at compile time; new keys would come in and I’d like to write the results for that key to a separate file to those of the other keys.

I'd expect to see 3 files, named 'key1.txt', 'key2.txt' and 'key3.txt'.

Is this something Flink can do out of the box?

Garth Oatley
  • 23
  • 1
  • 7

2 Answers2

2

You can try the following sink's implementation, which can be used with KeyedStream :

KeyedStream<Tuple2<String, String>, Tuple> keyedDataStream = dataStream.keyBy(0);



StreamKeyPartitionerSink<Tuple2<String, SynopsesEvent>> sinkFunction = new StreamKeyPartitionerSink<Tuple2<String, SynopsesEvent>>(
    "../data/key_grouping", "f0"); // f0 is the key field name
keyedDataStream.addSink(sinkFunction);

For more info about state managament in Flink : https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html#keyed-state since I used it for managing state per key.

import java.io.BufferedWriter;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.lang.reflect.Field;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.List;

import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

/**
 * * Flink sink writes tuples to files partitioned by their keys, which also writes the records as
 * batches.
 *
 * @param <IN> Input tuple type
 * 
 * @author ehabqadah
 */
public class StreamKeyPartitionerSink<IN> extends RichSinkFunction<IN> {

  private transient ValueState<String> outputFilePath;
  private transient ValueState<List<IN>> inputTupleList;
  /**
   * Number of rcords to be hold before writing.
   */
  private int writeBatchSize;
  /**
   * The output directory path
   */
  private String outputDirPath;
  /**
   * The name of the input tuple key
   */
  private String keyFieldName;



  public StreamKeyPartitionerSink(String outputDirPath, String keyFieldName) {

    this(outputDirPath, keyFieldName, 1);
  }

  /**
   * 
   * @param outputDirPath- writeBatchSize the size of on hold batch before write
   * @param writeBatchSize - output directory
   */
  public StreamKeyPartitionerSink(String outputDirPath, String keyFieldName, int writeBatchSize) {

    this.writeBatchSize = writeBatchSize;
    this.outputDirPath = outputDirPath;
    this.keyFieldName = keyFieldName;
  }

  @Override
  public void open(Configuration config) {
    // initialize state holders
 `//for more info about state management check  `//
    ValueStateDescriptor<String> outputFilePathDesc =
        new ValueStateDescriptor<String>("outputFilePathDesc",
            TypeInformation.of(new TypeHint<String>() {}));

    ValueStateDescriptor<List<IN>> inputTupleListDesc =
        new ValueStateDescriptor<List<IN>>("inputTupleListDesc",
            TypeInformation.of(new TypeHint<List<IN>>() {}));

    outputFilePath = getRuntimeContext().getState(outputFilePathDesc);
    inputTupleList = getRuntimeContext().getState(inputTupleListDesc);

  }

  @Override
  public void invoke(IN value) throws Exception {
    List<IN> inputTuples =
        inputTupleList.value() == null ? new ArrayList<IN>() : inputTupleList.value();

    inputTuples.add(value);
    if (inputTuples.size() == writeBatchSize) {

      writeInputList(inputTuples);
      inputTuples = new ArrayList<IN>();
    }

    // update the state
    inputTupleList.update(inputTuples);

  }

  /**
   * Write the tuple list, each record in separate line
   * 
   * @param tupleList
   * @throws Exception
   */
  public void writeInputList(List<IN> tupleList) {

    String path = getOrInitFilePath(tupleList);
    try (PrintWriter outStream = new PrintWriter(new BufferedWriter(new FileWriter(path, true)))) {
      for (IN tupleToWrite : tupleList) {
        outStream.println(tupleToWrite);
      }
    } catch (IOException e) {
      throw new RuntimeException("Exception occured while writing file " + path, e);
    }
  }

  private String getOrInitFilePath(List<IN> tupleList) {

    IN firstInstance = tupleList.get(0);
    String path = null;
    try {
      path = outputFilePath.value();

      if (path == null) {
        Field keyField = firstInstance.getClass().getField(keyFieldName);
        String keyValue = keyField.get(firstInstance).toString();
        path = Paths.get(outputDirPath, keyValue + ".txt").toString();

        setUpOutputFilePathPath(outputDirPath, path);
        // save the computed path for this key
        outputFilePath.update(path);
      }
    } catch (IOException | NoSuchFieldException | SecurityException | IllegalArgumentException
        | IllegalAccessException e) {
      throw new RuntimeException(
          "ExceptionsetUpOutputFilePathPath occured while fetching the value of key field " + path,
          e);
    }
    return path;
  }

  private void setUpOutputFilePathPath(String outputDirPath, String path) throws IOException {
    if (!Files.exists(Paths.get(outputDirPath))) {
      Files.createDirectories(Paths.get(outputDirPath));

    }
    // create the file if it does not exist and delete its content
    Files.write(Paths.get(path), "".getBytes(), StandardOpenOption.CREATE,
        StandardOpenOption.TRUNCATE_EXISTING);

  }
}
Ehab Qadah
  • 570
  • 4
  • 12
1

That is not possible ouf-of-the-box. However, you can implement an own output format and use it via result.out(...) (for Batch API); see https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/batch/index.html#data-sinks

For Streaming API, it would be stream.addSink(...); see https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/index.html#data-sinks

Matthias J. Sax
  • 59,682
  • 7
  • 117
  • 137
  • I found something related here and the original authored answered the question too. http://stackoverflow.com/questions/33414141/flink-streaming-how-to-output-one-data-stream-to-different-outputs-depending-on – Garth Oatley Mar 20 '17 at 16:09