I have been working on Spark Datasets recently, I have a scenario where I have to generate row number for each row and store it in a column named "Ids". This row number starts from 1, 2, 3... and increments based on the number of rows in dataset. (In my case there are 10000-20000 records)
Consider, I have a dataset 'empDataset' with values:
name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz
Now for the above dataset I want to add a Column 'Ids' with values incrementing from 1,2,3.. so on.
The expected output is this
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3
I also want to store this output in a another dataset and use it further for different transformations.
Need help to solve this problem statement.!!
My code snippet :
LongAccumulator accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;
spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {
@Override
public Long call(String namCol) throws Exception {
accum.add(rowNumber);
System.out.println("inside" + accum.value());
return accum.value();
}
}, DataTypes.LongType);
Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
col(name)));
Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)
The output I am getting is empDatasetWithIds (Incorrect output) :
name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1
The above code works fine when run on local mode but on the cluster mode the values do not increment.
I also went through below links: https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html Spark Java Accumulator not incrementing
The spark accumalators require an action to trigger the job. In my scenario, I am further performing filter transformation on the dataset, how can I solve this problem. Need help.