2

I am trying to add a column to my DataFrame that serves as a unique ROW_ID for the column. So, it would be something like this 1, user1 2, user2 3, user3 ... I could have done this easily using a hashMap with an integer iterating but I can't do this in spark using the map function on DataFrame since I can't have an integer increasing inside the map function. Is there any way that I can do this by appending one column to my existing DataFrame or any other way? PS: I know there is a very similar post, but that's for Scala and not java.

Thanks in advance

Community
  • 1
  • 1
Zahra I.S
  • 695
  • 1
  • 10
  • 20

2 Answers2

4

I did it by adding a column containing UUIDs in a new Column in DataFrame.

StructType objStructType = inputDataFrame.schema();
        StructField []arrStructField=objStructType.fields();
        List<StructField> fields = new ArrayList<StructField>();
        List<StructField> newfields = new ArrayList<StructField>();
        List <StructField> listFields = Arrays.asList(arrStructField);
        StructField a = DataTypes.createStructField(leftCol,DataTypes.StringType, true);
        fields.add(a);
        newfields.addAll(listFields);
        newfields.addAll(fields);
        final int size = objStructType.size();

    JavaRDD<Row> rowRDD =   inputDataFrame.javaRDD().map(new Function<Row, Row>() {
        private static final long serialVersionUID = 3280804931696581264L;
        public Row call(Row tblRow) throws Exception {

                Object[] newRow = new Object[size+1];
                int rowSize=    tblRow.length();
                for (int itr = 0; itr < rowSize; itr++)
                {
                    if(tblRow.apply(itr)!=null)
                    {
                        newRow[itr] = tblRow.apply(itr);
                    }

                }
                newRow[size] = UUID.randomUUID().toString();
                return RowFactory.create(newRow);

        }
    });



    inputDataFrame = objsqlContext.createDataFrame(rowRDD, DataTypes.createStructType(newfields));
Harish Pathak
  • 1,567
  • 1
  • 18
  • 32
1

Ok, I found the solution to this problem and I'm posting it in case someone would have the same problem:

The way to do this it zipWithIndex from JavaRDD()

df.javaRDD().zipWithIndex().map(new Function<Tuple2<Row, Long>, Row>() { @Override public Row call(Tuple2<Row, Long> v1) throws Exception { return RowFactory.create(v1._1().getString(0), v1._2()); } })

Zahra I.S
  • 695
  • 1
  • 10
  • 20