2

I have a PCollection of KV where key is gcs file_patterns and value is some additional info of the files (e.g., the "Source" systems that generated the files). E.g.,

KV("gs://bucket1/dir1/*", "SourceX"),
KV("gs://bucket1/dir2/*", "SourceY")

I need a PTransferm to expand the file_patterns to all matching files in the GCS folders, and keep the "Source" field. E.g., if there are two files X1.dat, X2.dat under dir1 and one file (Y1.dat) under dir2, the output will be:

KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir1/X2.dat", "SourceX")
KV("gs://bucket1/dir2/Y1.dat", "SourceY")

Could I use FileIO.matchAll() to achieve this? I am stuck on how to combine/join the "Source" field to the matching files. This is something I was trying, not quite there yet:

public PCollection<KV<String, String> expand(PCollection<KV<String, String>> filesAndSources) {
      return filesAndSources
          .apply("Get file names", Keys.create()) 
          .apply(FileIO.matchAll())
          .apply(FileIO.readMatches())
          .apply(ParDo.of(
            new DoFn<ReadableFile, KV<String, String>>() {

              @ProcessElement
              public void processElement(ProcessContext c) {
                 ReadableFile file = c.element();
                 String fileName = file.getMetadata().resourceId().toString();
                 c.output(KV.of(fileName, XXXXX)); // How to get the value field ("Source") from the input KV?

My difficulty is the last line, for XXXXX, how do I get the value field ("Source") from the input KV? Any way to "join" or "combine" the input KV's value back to the 'expended' keys, as one key (file_pattern) is expanded to multiple values.

Thank you!

1 Answers1

2

MatchResult.Medata contains the resourceId you are already using but not the GCS path (with wildcards) it matched.

You can achieve what you want using side inputs. To demonstrate this I created the following filesAndSources (as per your comment this could be an input parameter so it can't be hard-coded downstream):

PCollection<KV<String, String>> filesAndSources = p.apply("Create file pattern and source pairs",
    Create.of(KV.of("gs://" + Bucket + "/sales/*", "Sales"),
              KV.of("gs://" + Bucket + "/events/*", "Events")));

I materialize this into a side input (in this case as Map). The key will be the glob pattern converted into a regex one (thanks to this answer) and the value will be the source string:

final PCollectionView<Map<String, String>> regexAndSources =
filesAndSources.apply("Glob pattern to RegEx", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
  @ProcessElement
  public void processElement(ProcessContext c) {
    String regex = c.element().getKey();

    StringBuilder out = new StringBuilder("^");
    for(int i = 0; i < regex.length(); ++i) {
        final char ch = regex.charAt(i);
        switch(ch) {
            case '*': out.append(".*"); break;
            case '?': out.append('.'); break;
            case '.': out.append("\\."); break;
            case '\\': out.append("\\\\"); break;
            default: out.append(ch);
        }
    }
    out.append('$');
    c.output(KV.of(out.toString(), c.element().getValue()));
}})).apply("Save as Map", View.asMap());

Then, after reading the filenames we can use the side input to parse each path to see which is the matching pattern/source pair:

filesAndSources
  .apply("Get file names", Keys.create()) 
  .apply(FileIO.matchAll())
  .apply(FileIO.readMatches())
  .apply(ParDo.of(new DoFn<ReadableFile, KV<String, String>>() {
      @ProcessElement
      public void processElement(ProcessContext c) {
        ReadableFile file = c.element();
        String fileName = file.getMetadata().resourceId().toString();

        Set<Map.Entry<String,String>> patternSet = c.sideInput(regexAndSources).entrySet();    

        for (Map.Entry< String,String> pattern:patternSet) 
        { 
            if (fileName.matches(pattern.getKey())) {
              String source = pattern.getValue();
              c.output(KV.of(fileName, source));
            }
        }
     }}).withSideInputs(regexAndSources))

Note that the regex conversion is done when before materializing the side input instead of here to avoid duplicate work.

The output, as expected in my case:

Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/events/*
Feb 24, 2019 10:44:05 PM org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
INFO: Matched 2 files for pattern gs://REDACTED/sales/*
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales1.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/sales/sales2.csv, value=Sales
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events1.csv, value=Events
Feb 24, 2019 10:44:05 PM com.dataflow.samples.RegexFileIO$3 processElement
INFO: key=gs://REDACTED/events/events2.csv, value=Events

Full code.

Guillem Xercavins
  • 6,938
  • 1
  • 16
  • 35
  • Thanks for the reply. However, in my case, both of filepatterns (gs://bucket1/dir1/*) and sources (e.g., SourceX) are highy dynamic. It's unknown upfront. Users can pass in anything. So it's not possible to use the hard-coded values in the code. – JacobmcDonald Feb 24 '19 at 02:20
  • Then it's more challenging but definitely doable with side inputs (I just noticed that @Reza Rokni also recommended this while I was writing the example!). See my edited answer and let me know if it helps. – Guillem Xercavins Feb 24 '19 at 22:10