3

I have setup a pipeline. I have to parse hundreds of *.gz files. Therefore glob works quite good.

But I need the original name of the currently processed file, because i want to name the result files as the original files.

Can anyone help me here?

Here is my code.

@Default.String(LOGS_PATH + "*.gz")
String getInputFile();
void setInputFile(String value);


    TextIO.Read read = TextIO.read().withCompressionType(TextIO.CompressionType.GZIP).from(options.getInputFile());
        read.getName();

        p.apply("ReadLines", read).apply(new CountWords())
         .apply(MapElements.via(new FormatAsTextFn()))
         .apply("WriteCounts", TextIO.write().to(WordCountOptions.LOGS_PATH + "_" + options.getOutput()));

    p.run().waitUntilFinish();
Shaido
  • 27,497
  • 23
  • 70
  • 73

1 Answers1

5

This is possible starting with Beam 2.2 using a combination of FileIO.match(), FileIO.read() and custom code to read lines of text. You can already use this at HEAD, or you can wait until release 2.2 is finalized (it's currently in progress).

PCollection<KV<String, String>> filesAndLines = 
  p.apply(FileIO.match().filepattern(...))
   .apply(FileIO.read())
   .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
     @ProcessElement
     public void process(ProcessContext c) {
       ReadableFile f = c.element();
       String filename = f.getMetadata().resourceId().toString();
       String line;
       try (BufferedReader r = new BufferedReader(Channels.newInputStream(f.open()))) {
         while ((line = r.readLine()) != null) {
           c.output(KV.of(filename, line));
         }
       }
     }
   }));
jkff
  • 17,623
  • 5
  • 53
  • 85
  • would you happen to know if there is an easy way to solve my problem here for python now? https://stackoverflow.com/questions/53404579/dataflow-apache-beam-how-to-access-current-filename-when-passing-in-pattern – WIT Nov 21 '18 at 18:42
  • Using your solution, the final output is a key-value pair. In order to write the actual line (value) to the appropriate file (key), would I need a custom sink? Or is there a better way to specify what needs to be written? – Scicrazed Jul 18 '19 at 21:26