I am trying to transform the input data form Dataset on the fly, is it possible to store the data in arrayList and iterate them all again to group them by date and ouput it. Here is the code I tried. Some reason the intermediate storage Array is not populated and the forloop is hanging before output. What would be the better solution to group all values for the day and output in Tuple format.
StructType inputSchema = DataTypes.createStructType(
new StructField[] { DataTypes.createStructField("dateval", DataTypes.DateType, false),
DataTypes.createStructField("id", DataTypes.StringType, false),
DataTypes.createStructField("values", DataTypes.StringType, false), });
java.sql.Date sqlDate1 = java.sql.Date.valueOf("2018-03-09");
java.sql.Date sqlDate2 = java.sql.Date.valueOf("2018-03-09");
java.sql.Date sqlDate3 = java.sql.Date.valueOf("2018-03-10");
List<Row> rowData= new ArrayList<Row>();
Row r1 = RowFactory.create(sqlDate1, "item1", "val1");
Row r2 = RowFactory.create(sqlDate2, "item2", "val1");
Row r3 = RowFactory.create(sqlDate3, "item3", "val1");
rowData.add(r1);
rowData.add(r2);
rowData.add(r3);
Dataset<Row> ds = sessionBuilder.createDataFrame(rowData, inputSchema).toDF();
ds.show(false);
Set<java.sql.Date> dateSet= new HashSet<java.sql.Date>();
Dataset< Tuple2<java.sql.Date,String[]>> gPart = ds.mapPartitions(func ->{
List<Tuple2<java.sql.Date,String[]>> inputList = new ArrayList<Tuple2<java.sql.Date,String[]>>();
while (func.hasNext()) {
Row row = func.next();
java.sql.Date dateVal= row.getDate(0);
dateSet.add(dateVal);
mMap.put(dateVal,new String[]{row.getString(1),row.getString(2)});
//inputList.add(new Tuple2<java.sql.Date,String[]>(dateVal,new String[]{row.getString(2),row.getString(3)}));
}
for(java.sql.Date in :dateSet ) {
//look through all dates
String[] g=null;
Collection<String[]> gt= mMap.get(in);
while(gt.iterator().hasNext()){
g=gt.iterator().next();
}
inputList.add(new Tuple2<java.sql.Date,String[]>(java.sql.Date.valueOf(in.toString()),g));
}
return inputList.iterator();
}, Encoders.tuple( Encoders.DATE(),sessionBuilder.implicits().newStringArrayEncoder())
);
gPart.show(65,false);