I have a PCollection of KV where key is gcs file_patterns and value is some additional info of the files (e.g., the "Source" systems that generated the files). E.g.,
KV("gs://bucket1/dir1/*", "SourceX"),
KV("gs://bucket1/dir2/*", "SourceY")
I need a PTransferm to expand the file_patterns to all matching files in the GCS folders, and keep the "Source" field. E.g., if there are two files X1.dat, X2.dat under dir1 and one file (Y1.dat) under dir2, the output will be:
KV("gs://bucket1/dir1/X1.dat", "SourceX"),
KV("gs://bucket1/dir1/X2.dat", "SourceX")
KV("gs://bucket1/dir2/Y1.dat", "SourceY")
Could I use FileIO.matchAll() to achieve this? I am stuck on how to combine/join the "Source" field to the matching files. This is something I was trying, not quite there yet:
public PCollection<KV<String, String> expand(PCollection<KV<String, String>> filesAndSources) {
return filesAndSources
.apply("Get file names", Keys.create())
.apply(FileIO.matchAll())
.apply(FileIO.readMatches())
.apply(ParDo.of(
new DoFn<ReadableFile, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
ReadableFile file = c.element();
String fileName = file.getMetadata().resourceId().toString();
c.output(KV.of(fileName, XXXXX)); // How to get the value field ("Source") from the input KV?
My difficulty is the last line, for XXXXX, how do I get the value field ("Source") from the input KV? Any way to "join" or "combine" the input KV's value back to the 'expended' keys, as one key (file_pattern) is expanded to multiple values.
Thank you!