0
ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/05/2021 500
01 30/06/2021 600

Expected Output (Since March/April/May months are missing), The balance for missing months are previous months balance

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/03/2021 200
01 30/04/2021 200
01 31/05/2021 500
01 30/06/2021 600

Java Code:

I read this file into a

PCollection< Row >   row = /*Logic to read csv */

Updated Code

CSV File:

ID date balance
01 31/01/2021 100
01 28/02/2021 200
01 31/05/2021 500
01 30/06/2021 600

Schema File

{
  "type" : "record",
  "name" : "Entry",
  "namespace" : "transform",
  "fields" : [  {
    "name" : "ID",
    "type" : [ "string", "null" ]
  }, {
    "name" : "date",
    "type" : [ "string", "null" ]
  }, {
    "name" : "balance",
    "type" : [ "double", "null" ]
  } ]
}



public static void main(String[] args) throws IOException {
        
        final File schemaFile = new File("src/main/resources/schema_ie.avsc");

        File csvFile = new File("src/main/resources/CustomerRequestIE.csv");

        Schema schema = new Schema.Parser().parse(schemaFile);

        Pipeline pipeline = Pipeline.create();

        // Reading schema
        org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);

        final PCollectionTuple tuples = pipeline

                // Reading csv input
                .apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))

                // Reading files that matches conditions //PRashanth needs to be looked at
                .apply("2", FileIO.readMatches())

                // Reading schema and validating with schema and converts to row and returns
                // valid and invalid list
                .apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
                        TupleTagList.of(invalidTag())));

        // Fetching only valid rows

        final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));


        ToKV toKV = new ToKV();
        toKV.setColumnName1("ID");
        toKV.setColumnName2("date");
        PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
        PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());
        
//      groupedKVRows.apply(ParDo.of(new ForwardFillFn()));
        
        

        pipeline.run();
        pipeline.run().waitUntilFinish();
        System.out.println("The end");

    }

    


}


class ToKV extends DoFn<Row, KV<String, Row>> {

    private static final long serialVersionUID = -8093837716944809689L;
    String columnName1 = null;
    String columnName2 = null;

    @ProcessElement
    public void processElement(ProcessContext context) {
        Row row = context.element();
        org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
        context.output(KV.of(row.getValue(columnName1).toString() + row.getValue(columnName2).toString(), row));
    }

    public void setColumnName1(String columnName1) {
        this.columnName1 = columnName1;
    }

    public void setColumnName2(String columnName2) {
        this.columnName2 = columnName2;
    }

}

class ForwardFillFn extends DoFn<KV<String,Iterable<Row>>, KV<String,Iterable<Row>>>{
    @StateId("model")
      private final StateSpec<ValueState<Model>> modelSpec =
          StateSpecs.value(Model.coder());

      @StateId("previousPrediction")
      private final StateSpec<ValueState<Prediction>> previousPredictionSpec =
          StateSpecs.value(Prediction.coder());
      
      @ProcessElement
      public void processElement(
          ProcessContext context,
          @StateId("index") ValueState<Integer> index) {
      }
}

I have written this based on the examples from https://beam.apache.org/blog/stateful-processing/ . I am stuck at writing the implementation for ForwardFillFn class. I am unable to resolve the Model and Prediction class.

Also I am lost in the implementation part we for processElement

Ron Santis
  • 45
  • 8
  • Have you looked into using a [CombinePerKey](https://beam.apache.org/documentation/programming-guide/#combine)? That seems like it would do what you need. – Daniel Oliveira Oct 29 '21 at 02:14
  • @DanielOliveira, No, I have not used it. But on checking the links I could observer that the examples shows to obtain sum/max or min value for a collection but not how to modify it element value. – Ron Santis Oct 29 '21 at 02:26
  • @DanielOliveira Kindly share me a link with similar functionality. The examples which I looked into was only on Aggregation. – Ron Santis Oct 29 '21 at 03:36
  • 1
    I was considering a CombinePerKey that would aggregate each element into a sorted iterable and apply the forward fill. Alternatively, a GroupByKey followed by a custom ParDo which sorts each iterable and applies a forward fill would accomplish the same result. Either way, aggregation is required. Would this work? – Daniel Oliveira Nov 02 '21 at 01:46
  • @DanielOliveira, Thanks for your suggestion, Based on on which I have found a blogpost which suggest similar approach as yours. I am made an effort to implement the same using GroupByKey. But I am stuck in implementing ParDo. I have updated my question, Kindly have a look at it. The blogpost which I was referring is https://beam.apache.org/blog/stateful-processing – Ron Santis Nov 03 '21 at 02:30
  • 1
    Sorry it took me so long to get back to this. It looks like a similar question has been answered in that time, [maybe this answer will help](https://stackoverflow.com/questions/69803118/apache-beam-update-current-row-values-based-on-the-values-from-previous-row). In particular, the approach taken here is much simpler than using stateful processing for this problem. (Stateful processing would be very useful if this were a streaming pipeline, but since you use a csv I assume it's a batch pipeline.) – Daniel Oliveira Nov 06 '21 at 01:35

0 Answers0