ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
Expected Output (Since March/April/May months are missing), The balance for missing months are previous months balance
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/03/2021 | 200 |
01 | 30/04/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
Java Code:
I read this file into a
PCollection< Row > row = /*Logic to read csv */
Updated Code
CSV File:
ID | date | balance |
---|---|---|
01 | 31/01/2021 | 100 |
01 | 28/02/2021 | 200 |
01 | 31/05/2021 | 500 |
01 | 30/06/2021 | 600 |
Schema File
{
"type" : "record",
"name" : "Entry",
"namespace" : "transform",
"fields" : [ {
"name" : "ID",
"type" : [ "string", "null" ]
}, {
"name" : "date",
"type" : [ "string", "null" ]
}, {
"name" : "balance",
"type" : [ "double", "null" ]
} ]
}
public static void main(String[] args) throws IOException {
final File schemaFile = new File("src/main/resources/schema_ie.avsc");
File csvFile = new File("src/main/resources/CustomerRequestIE.csv");
Schema schema = new Schema.Parser().parse(schemaFile);
Pipeline pipeline = Pipeline.create();
// Reading schema
org.apache.beam.sdk.schemas.Schema beamSchema = AvroUtils.toBeamSchema(schema);
final PCollectionTuple tuples = pipeline
// Reading csv input
.apply("1", FileIO.match().filepattern(csvFile.getAbsolutePath()))
// Reading files that matches conditions //PRashanth needs to be looked at
.apply("2", FileIO.readMatches())
// Reading schema and validating with schema and converts to row and returns
// valid and invalid list
.apply("3", ParDo.of(new FileReader(beamSchema)).withOutputTags(FileReader.validTag(),
TupleTagList.of(invalidTag())));
// Fetching only valid rows
final PCollection<Row> rows = tuples.get(FileReader.validTag()).setCoder(RowCoder.of(beamSchema));
ToKV toKV = new ToKV();
toKV.setColumnName1("ID");
toKV.setColumnName2("date");
PCollection<KV<String, Row>> kvRows = rows.apply(ParDo.of(toKV)).setCoder(KvCoder.of(StringUtf8Coder.of(), rows.getCoder()));
PCollection<KV<String,Iterable<Row>>> groupedKVRows = kvRows.apply(GroupByKey.<String,Row>create());
// groupedKVRows.apply(ParDo.of(new ForwardFillFn()));
pipeline.run();
pipeline.run().waitUntilFinish();
System.out.println("The end");
}
}
class ToKV extends DoFn<Row, KV<String, Row>> {
private static final long serialVersionUID = -8093837716944809689L;
String columnName1 = null;
String columnName2 = null;
@ProcessElement
public void processElement(ProcessContext context) {
Row row = context.element();
org.apache.beam.sdk.schemas.Schema schema = row.getSchema();
context.output(KV.of(row.getValue(columnName1).toString() + row.getValue(columnName2).toString(), row));
}
public void setColumnName1(String columnName1) {
this.columnName1 = columnName1;
}
public void setColumnName2(String columnName2) {
this.columnName2 = columnName2;
}
}
class ForwardFillFn extends DoFn<KV<String,Iterable<Row>>, KV<String,Iterable<Row>>>{
@StateId("model")
private final StateSpec<ValueState<Model>> modelSpec =
StateSpecs.value(Model.coder());
@StateId("previousPrediction")
private final StateSpec<ValueState<Prediction>> previousPredictionSpec =
StateSpecs.value(Prediction.coder());
@ProcessElement
public void processElement(
ProcessContext context,
@StateId("index") ValueState<Integer> index) {
}
}
I have written this based on the examples from https://beam.apache.org/blog/stateful-processing/ . I am stuck at writing the implementation for ForwardFillFn class. I am unable to resolve the Model and Prediction class.
Also I am lost in the implementation part we for processElement