0

Right now I'm sending failed records from an API call to a GCS bucket thats partitioned by day

ex:

gs://path/to/file/2022/07/01

gs://path/to/file/2022/07/02

etc..

From there, I would like to schedule a batch job to retry these failed records the next day using Apache Beam and Dataflow. The issue is that the date at the end of the GCS path is only added when the initial template was uploaded to GCP and remains fixed at that date regardless of when you run the job similar to this and this. I'm using a ValueProvider but I cannot figure out a work around for this.

I've found that when I run the pipeline locally, everything works great, but in Dataflow, only the DoFns within the expand function actually get run.

Essentially what I'm doing here is:

  1. getting the initial gcs path and appending current date minus 24 hours to the end in fileMatch

  2. calling FileIO.readMatches() to convert each match() to a ReadableFile

  3. calling MatchGCSFiles which contains the exact code below that uses the value providers to get the current date and append to the GCS path. I did this to override the existing path because this was the only way to get it to work as I thought I understood a DoFn cannot take in empty input (still learning Beam)

  4. call FileIO.readMatches() again to convert the new match() to a new ReadableFile then call the API.

     String dateFormat = "yyyy/MM/dd";
         ValueProvider<String> date =
             new ValueProvider<String>() {
               @Override
               public String get() {
                 String currentDate = Instant.now().minus(86400000).toDateTime(DateTimeZone.UTC).toString(dateFormat);
                 return currentDate;
               }
    
               @Override
               public boolean isAccessible() {
                 return true;
               }
             };
         ValueProvider<String> gcsPathWithDate =
             new ValueProvider<String>() {
               @Override
               public String get() {
                 return String.format("%s/%s/*/*.json", gcsPathPrefix, date.get());
               }
    
               @Override
               public boolean isAccessible() {
                 return true;
               }
             };
         fileMatch = FileIO.match().filepattern(gcsPathWithDate.get());
       }
    
       PCollectionTuple mixedPColl =
           input
               .getPipeline()
               .apply("File match", fileMatch)
               .apply("applying read matches", FileIO.readMatches())
               .apply("matching files", ParDo.of(new MatchGCSFiles()))
               .apply("applying read matches", FileIO.readMatches()). //problem here
               .apply("Read failed events from GCS", ParDo.of(new ReadFromGCS()))
               .apply(//call API)...
    

The problem is in the second FileIO.readMatches(), the return type does not match: reason: no instance(s) of type variable(s) exist so that PCollection conforms to PCollection

I've tried different work arounds for this but none seem to work.

Is there another/better way to dynamically add/replace the date in the GCS path? I'm still learning Beam so if I'm doing something wrong please let me know.

Thanks in advance.

1 Answers1

0

It seems that there are a few improvements you can apply:

  • Your pipeline always reads the file from yesterday. So you don't need a runtime parameter (and the usage of ValueProvider in your example is not right since it's not used as a runtime parameter provided by a pipeline option).

    • If you do need to set an arbitrary date, you'll have to follow the example to create a pipeline option.
    public interface MyOptions extends PipelineOptions {
      @Description("Date in yyyy-MM-dd")
      @Default.String("2022-01-01")
      ValueProvider<String> getDateValue();
    
      void setDateValue(ValueProvider<String> value);
    }
    

    Then use it in your DoFns.

    ...
    @ProcessElement
    public void process(ProcessContext c) {
      MyOptions ops = c.getPipelineOptions().as(MyOptions.class);
      // Use it.
      ...(ops.getDateValue())
    }
    ...
    

    When you start a job, you have to provide the pipeline option of that arbitrary date just like when you set any other PipelineOptions.

  • You can always get yesterday's date by Get yesterday's date using Date.

    Instant now = Instant.now();
    Instant yesterday = now.minus(1, ChronoUnit.DAYS);
    

    and then use it directly in your pipeline.

ningk
  • 1,298
  • 1
  • 7
  • 7
  • Thanks for the reply. When using the PipelineOptions interface with a ValueProvider, how does it get the date? Does it have to be passed in via command line args? I would like it to just automatically get the correct date when it runs on its own without my intervention. – mikeWazowski Jul 12 '22 at 19:47
  • Also, where would you use the ops.getDateValue() you included? As in right now I use it outside the DoFns in the beginning of the expand function. Would you use it there or somewhere else? thanks again – mikeWazowski Jul 12 '22 at 19:48
  • It has to be passed in via command line args. But you can write a wrapper script to automate the process. Additionally, if you just want to use yesterday's date, you don't have to use the pipeline option or ValueProvider at all. Just creating and using the value in your pipeline should be sufficient. – ningk Jul 12 '22 at 20:08
  • You should be able to use the value outside of the DoFn. In fact, you should be able to use the pipeline options, see the WordCount example where the --inputFile can be provided: https://github.com/apache/beam/blob/master/examples/java/src/main/java/org/apache/beam/examples/WordCount.java#L191. You don't have to use a ValueProvider since you are not creating a Flex template and your input is kind of known at pipeline construction time. – ningk Jul 12 '22 at 20:14
  • So maybe I'm a bit confused. I'm only running yesterdays date generally so I'd like to avoid using the value provider. I'm creating it and using it within the pipeline in the beginning of the expand just before it reads from GCS. The problem is that that date (string) does not change once its uploaded. It looks like only the code within the DoFns is actually run in dataflow, so the whole date setup portion does not get run in dataflow. Is this the wrong spot to use it? public PDone expand(input) { String runDate = options.as(gcsOptions.class).getDateValue().toString(); //here – mikeWazowski Jul 12 '22 at 20:36
  • The reason I'm including that is because the runDate doesnt get updated in dataflow, and keeps referencing the GCS path from when the template was first uploaded – mikeWazowski Jul 12 '22 at 20:50
  • Ah, yes. If you are using flex template and you are trying to set the value instead of generating it from pipeline runtime, you should put it in the process since expand is construction time. The way you are doing it is neither providing a runtime value when launching a job nor generating it during runtime, that's why it didn't work. – ningk Jul 12 '22 at 22:54
  • I see, thanks for the explanation. Can I get around this using the classic template? I tried using the flex template but was unable to get it to work and the examples were not great. Is there another way to do this with the existing flex template? Or is there a way I can dynamically regenerate the gcs path with the correct date in the `process`? – mikeWazowski Jul 13 '22 at 14:06
  • Reading over you comment again, how can I generate the runtime value when launching a job? I tried adding a runDate parameter in the data pipelines UI and setting it to str(datetime.datetime.now()) but that did not work. Thanks again for your help. – mikeWazowski Jul 13 '22 at 15:57
  • You don't have to use the UI to launch a Dataflow job with flex template. You can launch the job with a command: ```gcloud dataflow flex-template run "your-job-`date +%Y%m%d-%H%M%S`" \ --template-file-gcs-location "gs://BUCKET_NAME/your-template.json" \ --parameters runDate=`date +%Y/%m/%d` \ --region "REGION"``` – ningk Jul 13 '22 at 16:58
  • Additionally, you can use another Python script to invoke this command so that you can programmatically generate a bunch of customized runtime parameters. Make sure that you follow the example https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates to set up your metdadata file – ningk Jul 13 '22 at 17:01
  • ok that makes sense. would I still use the ValueProviders to read from the runtime params? and is there any other changes I have to make to run the flex template? I followed the guide for the classic template, and it seems the only thing I need to do is just run that command you posted. Appreciate the help – mikeWazowski Jul 13 '22 at 20:09
  • Yes, you have to create the metadata file, stage the flex template before you can use it. – ningk Jul 14 '22 at 00:03