0

I am trying to transform the input data form Dataset on the fly, is it possible to store the data in arrayList and iterate them all again to group them by date and ouput it. Here is the code I tried. Some reason the intermediate storage Array is not populated and the forloop is hanging before output. What would be the better solution to group all values for the day and output in Tuple format.

  StructType inputSchema = DataTypes.createStructType(
                new StructField[] { DataTypes.createStructField("dateval", DataTypes.DateType, false),
                        DataTypes.createStructField("id", DataTypes.StringType, false),
                        DataTypes.createStructField("values", DataTypes.StringType, false), });
        java.sql.Date sqlDate1 = java.sql.Date.valueOf("2018-03-09");
        java.sql.Date sqlDate2 = java.sql.Date.valueOf("2018-03-09");
        java.sql.Date sqlDate3 = java.sql.Date.valueOf("2018-03-10");
        List<Row> rowData= new ArrayList<Row>();
        Row r1 = RowFactory.create(sqlDate1, "item1", "val1");
        Row r2 = RowFactory.create(sqlDate2, "item2", "val1");
        Row r3 = RowFactory.create(sqlDate3, "item3", "val1");
        rowData.add(r1);
        rowData.add(r2);
        rowData.add(r3);
        Dataset<Row> ds = sessionBuilder.createDataFrame(rowData, inputSchema).toDF();
        ds.show(false);
  Set<java.sql.Date> dateSet= new HashSet<java.sql.Date>();
        Dataset< Tuple2<java.sql.Date,String[]>> gPart = ds.mapPartitions(func ->{
                List<Tuple2<java.sql.Date,String[]>> inputList = new ArrayList<Tuple2<java.sql.Date,String[]>>();

                  while (func.hasNext()) {
                   Row row = func.next();
                      java.sql.Date dateVal= row.getDate(0);
                      dateSet.add(dateVal);
                      mMap.put(dateVal,new String[]{row.getString(1),row.getString(2)});
                      //inputList.add(new Tuple2<java.sql.Date,String[]>(dateVal,new String[]{row.getString(2),row.getString(3)}));
                  }  
                  for(java.sql.Date in :dateSet )   {   
                        //look through all dates
                      String[] g=null;
                           Collection<String[]> gt=   mMap.get(in);
                           while(gt.iterator().hasNext()){
                                g=gt.iterator().next();

                                }
                           inputList.add(new Tuple2<java.sql.Date,String[]>(java.sql.Date.valueOf(in.toString()),g));
                  }

                return inputList.iterator();
                }, Encoders.tuple( Encoders.DATE(),sessionBuilder.implicits().newStringArrayEncoder())
            );
        gPart.show(65,false);
Masterbuilder
  • 499
  • 2
  • 12
  • 24
  • 1
    From what I see you have 2 strange things - you have `dateSet` and `mMap` defined outside of `mapPartitions` but used inside them. Do you understand, that the values that are added to these collections inside `mapPartitions` will be available only on the node they are added? Not sure if that is as desired, though. – Vladislav Varslavans May 23 '18 at 06:57
  • Please follow [these](https://stackoverflow.com/help/mcve) guidelines and provide samples of data and what you trying to achieve – Vladislav Varslavans May 23 '18 at 07:01
  • 1
    @VladislavVarslavans For the record we have [Spark specific guidelines](https://stackoverflow.com/q/48427185/8371915) – Alper t. Turker May 23 '18 at 09:24
  • I have edited the question and added input dataset – Masterbuilder May 23 '18 at 12:16

0 Answers0