I have a flattened PCollection that contains paths of files
PCollection<String> "/this/is/a/123/*.csv , /this/is/a/124/*.csv"
flattenPCollection = pcs.apply(Flatten.<String>pCollections());
I want to read each file and get the file name and process
flattenPCollection
.apply("Read CSV files", FileIO.matchAll())
.apply("Read matching files",FileIO.readMatches())
.apply("Process each file", ParDo.of(new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) {
// We shloud be able to file and its metadata.
logger.info("File Metadata resourceId is {} ", file.getMetadata().resourceId());
// here we read each line and process
}
}));
The following error is occurring
Caused by: java.io.FileNotFoundException: No files matched spec: bob,22,new york
it seems like the pipeline is reading the first lines of the csv file and looking for that string in the filesystem.
What is causing this to happen?
I want to get each file as FileIO.ReadableFile
I am sure its something very simple that I am missing. Any help is appreciated
UPDATE
If you have a PCollection of paths and files you have manually loop over each one and add a ParDo
for(String path : pathList) {
pipeline.apply(FileIO.match().filepattern(path))
.apply(FileIO.readMatches())
.apply(
ParDo.of(
new DoFn<FileIO.ReadableFile, String>() {
@ProcessElement
public void process(@Element FileIO.ReadableFile file) throws IOException {
logger.info("Metadata - " + file.getMetadata());
logger.info("File Contents - " + file.readFullyAsUTF8String());
logger.info("File Metadata resourceId is " + file.getMetadata().resourceId());
}
}));
}
Thanks to @bigbounty