0

I am reading multiple .gz file to process using google dataflow. Final destination of data is BigQuery. BigQuery table has dedicated columns for each columns in csv file within .gz file. There is one additional column in BQ table as file_name which gives the file name to which this record belongs to. I am reading files using TextIO.Read and doing ParDo transformation on it. Within DoFn is there a way to identify the file name to which the incoming string belongs to.

My code look like below:

PCollection<String> logs = pipeline.apply(TextIO.Read.named("ReadLines")
                .from("gcs path").withCompressionType(TextIO.CompressionType.AUTO));

PCollection<TableRow> formattedResults = logs.apply(ParDo.named("Format").of(new DoFn<String, TableRow>() {}

Update 1:

I am now trying as below:

        PCollection<String> fileNamesCollection // this is collection of file names
        GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(options.as(GcsOptions.class));
        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Format").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;

                @Override
                public void processElement(ProcessContext c) throws Exception {
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }
                }
        }));

But when I run this program am getting that channelFactory is not serializable, i there any channel factory which is implementing Serializable interface and can be used here.

Update 2: I am finally able to execute program and successfully submit job. Thanks to jkff for assistance. Below is my final code, I am pasting it here so that it will helpful for others too.

        ProcessLogFilesOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
                .as(ProcessLogFilesOptions.class); // ProcessLogFilesOptions is a custom class
        DataflowWorkerLoggingOptions loggingOptions = options.as(DataflowWorkerLoggingOptions.class);
        loggingOptions.setDefaultWorkerLogLevel(Level.WARN);

        String jobName = "unique_job_name";
        options.as(BlockingDataflowPipelineOptions.class).setJobName(jobName);

        Pipeline pipeline = Pipeline.create(options);

        List<String> filesToProcess = new ArrayList<String>();
        for(String fileName : fileNameWithoutHrAndSuffix) { // fileNameWithoutHrAndSuffix has elements like Log_20160921,Log_20160922 etc
            filesToProcess.addAll((new GcsIOChannelFactory(options.as(GcsOptions.class))).match(LogDestinationStoragePath+fileName));
        }
        // at this time filesToProcess will have all logs files name as Log_2016092101.gz,Log_2016092102.gz,.........,Log_2016092201.gz,Log_2016092223.gz
        PCollection<String> fileNamesCollection = pipeline.apply(Create.of(filesToProcess));

        PCollection<KV<String,String>> kv = fileNamesCollection.apply(ParDo.named("Parsing_Files").of(new DoFn<String, KV<String,String>>() {
                private static final long serialVersionUID = 1L;
                @Override
                public void processElement(ProcessContext c) throws Exception {
                    // I have to create _options here because Options, GcsIOChannelFactory are non serializable
                    ProcessLogFilesOptions _options = PipelineOptionsFactory.as(ProcessLogFilesOptions.class);
                    GcsIOChannelFactory channelFactory = new GcsIOChannelFactory(_options.as(GcsOptions.class));
                    ReadableByteChannel readChannel = channelFactory.open(c.element());
                    GZIPInputStream gzip = new GZIPInputStream(Channels.newInputStream(readChannel));
                    BufferedReader br = new BufferedReader(new InputStreamReader(gzip));

                    String line = null;
                    while ((line = br.readLine()) != null) {
                        c.output(KV.of(c.element(), line));
                    }

                    br.close();
                    gzip.close();
                    readChannel.close();
                }
        }));

        // Performing reshuffling here as suggested
        PCollection <KV<String,String>> withFileName = kv.apply(Reshuffle.<String, String>of());

        PCollection<TableRow> formattedResults = withFileName
                .apply(ParDo.named("Generating_TableRow").of(new DoFn<KV<String,String>, TableRow>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public void processElement(ProcessContext c) throws Exception {
                    KV<String,String> kv = c.element();
                    String logLine = kv.getValue();
                    String logFileName = kv.getKey();

                    // do further processing as you want here
        }));

        // Finally insert in BQ table the formattedResults
Programmer
  • 325
  • 5
  • 18

1 Answers1

1

Right now, the answer is no. If you need access to filenames, unfortunately, your best bet in this case is to implement filepattern expansion and file parsing yourself (as a ParDo). Here's a few things you'll need to keep in mind:

Alternatively, you may consider writing your own file-based source. However, in this particular case (.gz files) I would recommend against it, because that API is primarily intended for files that can be read with random access from any offset.

Community
  • 1
  • 1
jkff
  • 17,623
  • 5
  • 53
  • 85
  • I am not entirely clear on two points: file parsing yourself and using GcsIoChannelFactory for opening stream and then using GZipInputStream , Can you please share some code sample for reference – Programmer Sep 19 '16 at 21:19
  • I have just updated my question with update 1, there is an issue in using GcsIOChannelFactory because this is not serializable. – Programmer Sep 20 '16 at 14:52
  • Correct, it is not serializable, so you should not use it as a member variable in your DoFn's - neither explicitly, nor implicitly via capture in an anonymous class. Just make it a local variable. Also, remember to close the resources - the code in your current answer doesn't close them and it will ultimately run out of file descriptors and crash. – jkff Sep 20 '16 at 15:20
  • Thanks @jkff, I am able to submit job successfully and process log files. I have pasted my complete code in Update 2. I have just one question in mind that since options & GcsIoChannelFactory are non serializable and GcsIoChannelFactory dooesn't have empty constructor, so I have to create another Options as _options to use in instantiating GcsIoChannelFactory . Is this approach fine, I hope _options doesn't have any internal impact? – Programmer Sep 20 '16 at 19:00
  • Yes, looks fine to me! – jkff Sep 20 '16 at 20:27
  • Please note https://github.com/apache/incubator-beam/pull/1036 implements a commonly useful transform for preventing fusion, so once that's in, you don't need to roll your own. – jkff Oct 07 '16 at 22:16