In this question, we know
PCollection<String> lines = p.apply(TextIO.read() .from("gs://some-bucket/many/files/*") .withHintMatchesManyFiles());
Using this hint causes the transforms to execute in a way optimized for reading a large number of files: the number of files that can be read in this case is practically unlimited, and most likely the pipeline will run faster, cheaper and more reliably than without this hint.
However, the step of pipeline is stuck with codes as below
PCollection<String> lines = pipeline.apply("readDataFromGCS",
TextIO.read().from(sourcePath + "/prefix*")
.withHintMatchesManyFiles()
.watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));
And there are about 10 ~ 30MB new files uploaded to GCS every minutes.
However, we try read files from GCS in pub/sub, the pipeline could work well.
raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
.apply("Read file matchall", FileIO.matchAll())
.apply("Read file match", FileIO.readMatches())
.apply("Read file", TextIO.readFiles());
Anything am I missing here? or is there any other ways to read large number of files from GCS more efficiently?
The work flow of my pipeline is reading data from GCS and sink to Pub/Sub after data processing.
Beam version: 2.16.0