Right now I'm sending failed records from an API call to a GCS bucket thats partitioned by day
ex:
gs://path/to/file/2022/07/01
gs://path/to/file/2022/07/02
etc..
From there, I would like to schedule a batch job to retry these failed records the next day using Apache Beam and Dataflow. The issue is that the date at the end of the GCS path is only added when the initial template was uploaded to GCP and remains fixed at that date regardless of when you run the job similar to this and this. I'm using a ValueProvider but I cannot figure out a work around for this.
I've found that when I run the pipeline locally, everything works great, but in Dataflow, only the DoFns within the expand function actually get run.
Essentially what I'm doing here is:
getting the initial gcs path and appending current date minus 24 hours to the end in
fileMatch
calling FileIO.readMatches() to convert each
match()
to aReadableFile
calling MatchGCSFiles which contains the exact code below that uses the value providers to get the current date and append to the GCS path. I did this to override the existing path because this was the only way to get it to work as I thought I understood a DoFn cannot take in empty input (still learning Beam)
call
FileIO.readMatches()
again to convert the newmatch()
to a newReadableFile
then call the API.String dateFormat = "yyyy/MM/dd"; ValueProvider<String> date = new ValueProvider<String>() { @Override public String get() { String currentDate = Instant.now().minus(86400000).toDateTime(DateTimeZone.UTC).toString(dateFormat); return currentDate; } @Override public boolean isAccessible() { return true; } }; ValueProvider<String> gcsPathWithDate = new ValueProvider<String>() { @Override public String get() { return String.format("%s/%s/*/*.json", gcsPathPrefix, date.get()); } @Override public boolean isAccessible() { return true; } }; fileMatch = FileIO.match().filepattern(gcsPathWithDate.get()); } PCollectionTuple mixedPColl = input .getPipeline() .apply("File match", fileMatch) .apply("applying read matches", FileIO.readMatches()) .apply("matching files", ParDo.of(new MatchGCSFiles())) .apply("applying read matches", FileIO.readMatches()). //problem here .apply("Read failed events from GCS", ParDo.of(new ReadFromGCS())) .apply(//call API)...
The problem is in the second FileIO.readMatches(), the return type does not match: reason: no instance(s) of type variable(s) exist so that PCollection conforms to PCollection
I've tried different work arounds for this but none seem to work.
Is there another/better way to dynamically add/replace the date in the GCS path? I'm still learning Beam so if I'm doing something wrong please let me know.
Thanks in advance.