0

In this question, we know

 PCollection<String> lines = p.apply(TextIO.read()
   .from("gs://some-bucket/many/files/*")
   .withHintMatchesManyFiles());

Using this hint causes the transforms to execute in a way optimized for reading a large number of files: the number of files that can be read in this case is practically unlimited, and most likely the pipeline will run faster, cheaper and more reliably than without this hint.

However, the step of pipeline is stuck with codes as below

   PCollection<String> lines = pipeline.apply("readDataFromGCS",
          TextIO.read().from(sourcePath + "/prefix*")
                       .withHintMatchesManyFiles()
                       .watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));

And there are about 10 ~ 30MB new files uploaded to GCS every minutes.

However, we try read files from GCS in pub/sub, the pipeline could work well.

   raw_event = p.apply("Read Sub Message", PubsubIO.readStrings().fromTopic(options.getTopic()))
           .apply("Extract File Name", ParDo.of(new ExtractFileNameFn()))
           .apply("Read file matchall", FileIO.matchAll())
           .apply("Read file match", FileIO.readMatches())
           .apply("Read file", TextIO.readFiles());

Anything am I missing here? or is there any other ways to read large number of files from GCS more efficiently?


The work flow of my pipeline is reading data from GCS and sink to Pub/Sub after data processing.

Beam version: 2.16.0

Soni Sol
  • 2,367
  • 3
  • 12
  • 23
zangw
  • 43,869
  • 19
  • 177
  • 214
  • are you using BQ as a destination? – Jayadeep Jayaraman Nov 13 '19 at 04:53
  • @jjayadeep, the sink is Pub/Sub – zangw Nov 13 '19 at 05:32
  • What version of beam are you using ? Also, how many files are there on an average? – Jayadeep Jayaraman Nov 13 '19 at 05:39
  • @jjayadeep, there are about 8 ~ 10 zip files every 2 minutes, and the size of file is 10M bytes. The beam version I used is 2.16 – zangw Nov 13 '19 at 05:46
  • From this link - https://stackoverflow.com/questions/45362108/how-can-i-improve-performance-of-textio-or-avroio-when-reading-a-very-large-numb it looks like `withHintMatchesManyFiles` is not designed to work efficiently for few files. Can you remove the hint and see if this solves your problem? – Jayadeep Jayaraman Nov 13 '19 at 05:48
  • @jjayadeep, when I remove `withHintMatchesManyFiles`, the pipeline is still stuck. Which is the first try in my codes, and someone in GCP told me to try `withHintMatchesManyFiles`. Unfortunately, it still failed. – zangw Nov 13 '19 at 05:52
  • Also, in peak time of data processing, there are more than 10 zip files uploaded to GCP. – zangw Nov 13 '19 at 05:54
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/202246/discussion-between-jjayadeep-and-zangw). – Jayadeep Jayaraman Nov 13 '19 at 05:55
  • If the files don't have .zip extension can you add the `withCompressionType` explicitly as `TextIO.read().from(filepattern).withCompressionType(...)` – Jayadeep Jayaraman Nov 13 '19 at 06:01
  • @jjayadeep, the file extension is `.gz` – zangw Nov 13 '19 at 06:05
  • @jjayadeep, Also I have test `TextIO.read().from()` with few files, it could work well... – zangw Nov 13 '19 at 06:08
  • Are you saying that this `PCollection lines = pipeline.apply("readDataFromGCS", TextIO.read().from(sourcePath + "/prefix*") .watchForNewFiles(Duration.standardMinutes(1), Watch.Growth.never()));` is working ? – Jayadeep Jayaraman Nov 13 '19 at 06:09
  • @jjayadeep, yes, with several few files. it could work well... so weriod.. – zangw Nov 13 '19 at 06:11
  • That is what I mentioned in my earlier comment that `withHintMatchesManyFiles` will work well when you have thousands of files. For few files it is not efficient and this hint `should not` be used. – Jayadeep Jayaraman Nov 13 '19 at 06:12
  • Yes, I also test it without withHintMatchesManyFiles with few files. it could work well... – zangw Nov 13 '19 at 06:13
  • @jjayadeep, I have done lots of work before I ask question in SO – zangw Nov 13 '19 at 06:14
  • Cool. So I think for your requirement if you remove `withHintMatchesManyFiles` it should work well. – Jayadeep Jayaraman Nov 13 '19 at 06:14
  • @jjayadeep, let me clarify myself clearly, without and with `withHintMatchesManyFiles` for few test files under test gcs bucket, the pipeline works well. BUT, when pipeline try to read data from real data bucket, it failed again. – zangw Nov 13 '19 at 07:11
  • I think the best thing for you would be to open a support ticket and the support team can look at if there is anything specific with the bucket or other configuration. Just by looking at the code I don't see any issues. – Jayadeep Jayaraman Nov 13 '19 at 08:59
  • @jjayadeep, ticket has been opened, two weeks later, no root cause was found . Just post one question. In case of some guys met this issue before? – zangw Nov 13 '19 at 09:28

1 Answers1

1

When you attempt to read zipped/compressed files with TextIO.read() via Dataflow, the compressed file can only be decompressed by a single worker and a single thread of said worker. This results in your pipeline waiting for that single worker to decompress all the data,as such, the system outputs a warning message stating that your pipeline is stuck, but when in fact, the pipeline is not stuck but is simply attempting to decompress your data. At this time, there is no parallel decompression when streaming data.

Javier Bóbeda
  • 468
  • 2
  • 10