1

I would like to know if there is a way to find out total no rows in a file using google dataflow. Any code sample and pointer will be great help. Basically, I have a method as

int getCount(String fileName) {}

So, above method will return total count of rows and its implementation will be dataflow code.

Thanks

Programmer
  • 325
  • 5
  • 18
  • Could you clarify how big is the file, and why you want to use Dataflow for this as opposed to a straight-forward Java program that reads the file and counts lines one by one? Unless the file is at least many gigabytes in size, and unless the file is already stored on Google Cloud Storage, Dataflow is most likely not the best tool for the job. – jkff Aug 30 '16 at 22:17
  • Thanks for looking at it. Yes file is basically a gz file having size in GBs . Also file is located at GCS bucket. Apart from dataflow, do you think of any other way or have sample code, link for me to look at. I am able to read file from GCS bucket using using dataflow in PCollection(String) and apply Count.Globally on it but this again give me PCollection(Long), so am unable to return single long value from my method. Thanks. – Programmer Aug 31 '16 at 02:09
  • maybe this is of help? https://cloud.google.com/dataflow/model/combine – chchrist Aug 31 '16 at 10:29
  • @chchrist: As I mentioned I have already used Count.Globally but the issue is that it also return PCollection where I need to have my method to return Long value. I am not sure how I can i read value from PCollection – Programmer Aug 31 '16 at 14:26

1 Answers1

2

Seems like your use case is one that doesn't require distributed processing, because the file is compressed and hence can not be read in parallel. However, you may still find it useful to use Dataflow APIs for the sake of their ease of access to GCS and automatic decompression.

Since you also want to get the result out of your pipeline as an actual Java object, you need to use the Direct runner, which runs in-process, without talking to the Dataflow service or doing any distributed processing, however in return it provides the ability to extract PCollection's into Java objects:

Something like this:

PipelineOptions options = ...;
DirectPipelineRunner runner = DirectPipelineRunner.fromOptions(options);
Pipeline p = Pipeline.create(options);
PCollection<Long> countPC =
    p.apply(TextIO.Read.from("gs://..."))
     .apply(Count.<String>globally());
DirectPipelineRunner.EvaluationResults results = runner.run(p);
long count = results.getPCollection(countPC).get(0);
jkff
  • 17,623
  • 5
  • 53
  • 85
  • How to count the number of rows in the input file if we use DataflowRunner as the above solution doesn't work for my case. – viveknaskar Sep 17 '20 at 12:01
  • 1
    You can still use Count.globally() but you'll have to make your pipeline write the resulting 1-element PCollection into a file which you can read from your program after the pipeline completes. – jkff Sep 17 '20 at 21:13