1

I have been working on Spark Datasets recently, I have a scenario where I have to generate row number for each row and store it in a column named "Ids". This row number starts from 1, 2, 3... and increments based on the number of rows in dataset. (In my case there are 10000-20000 records)

Consider, I have a dataset 'empDataset' with values:

name , dept , project
---------------------
Tina, Finance , abc
Leena, Finance , abc
Joe, Marketing , xyz

Now for the above dataset I want to add a Column 'Ids' with values incrementing from 1,2,3.. so on.

The expected output is this

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 3

I also want to store this output in a another dataset and use it further for different transformations.

Need help to solve this problem statement.!!

My code snippet :

LongAccumulator  accValue = spark.sparkContext().longAccumulator();
long rowNumber = 1;

spark.udf().register("randomNumberGenerator", new UDF1<String, Long>() {

            @Override
            public Long call(String namCol) throws Exception {
                    accum.add(rowNumber);
                    System.out.println("inside" + accum.value());
                    return accum.value();
                }
        }, DataTypes.LongType);

Dataset<Row> empDatasetWithIds= empDataset.withColumn("Ids",callUDF("randomNumberGenerator",
                col(name)));

Dataset<Row> filterDept = empDatasetWithIds.filter(...here filtering with dept...)

The output I am getting is empDatasetWithIds (Incorrect output) :

name , dept , project ,Ids
--------------------------
Tina, Finance , abc , 1
Leena, Finance , abc , 2
Joe, Marketing , xyz , 1

The above code works fine when run on local mode but on the cluster mode the values do not increment.

I also went through below links: https://community.hortonworks.com/questions/36888/spark-java-accumulator-not-incrementing.html Spark Java Accumulator not incrementing

The spark accumalators require an action to trigger the job. In my scenario, I am further performing filter transformation on the dataset, how can I solve this problem. Need help.

Oli
  • 9,766
  • 5
  • 25
  • 46
ShaksM
  • 64
  • 10
  • I am not sure this is exactly a duplicate. The reason why the code proposed here is not working is explained in the post that is supposedly being duplicated. Yet, the actual problem that is tackled here (indexing a dataframe in java) is clearly not. This problem is addressed in this other post https://stackoverflow.com/questions/55160683/scala-how-can-i-split-up-a-dataframe-by-row-number. Would it be worth it to have a solution in java, given that it is slightly more complicated? – Oli Mar 18 '19 at 09:14
  • @elisah Not convinced it is a duplicate, more wrong technique chosen. – thebluephantom Mar 19 '19 at 19:07
  • @Oli see comment – thebluephantom Mar 19 '19 at 19:07

3 Answers3

2

Accumulators are variables that are used to accumulate data across the executors and send them back to the driver. If you read its value from the executor, the behavior is not defined (AFAIK). I think you would probably get what has been accumulated for the local partition so far. Indeed, the goal of spark is to make parallel computations. Therefore, when using an accumulator, the data is accumulated for each partition in a separate accumulator which are then merged and sent back to the driver (map reduce paradigm). So you cannot use an accumulator to share information between the executors. That's just not what it's meant for

What you can do however is this either use zipWithIndex from the RDD API if you need consecutive indices, or monoticallyIncreasingId from the SparkSQL API if you just need increasing indices. The former triggers a small spark job while the later is almost free (no spark job).

Option 1 (increasing but not necessarily consecutive indices)

yourDataframe.withColumn("id", functions.monotonicallyIncreasingId());

Option 2 (consecutive and increasing indices)

StructType schema = yourDataframe.schema();
schema.add(new StructField("id", DataTypes.LongType, false,null));
JavaRDD<Row> rdd = yourDataframe.toJavaRDD().zipWithIndex()
    .map(x -> {
         Collection<Object> row = JavaConverters.asJavaCollection(x._1.toSeq());
         Long index = x._2;
         row.add(index);
         return RowFactory.create(row);
    });
Dataset<Row> indexedData = spark.createDataFrame(rdd, schema);
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
Oli
  • 9,766
  • 5
  • 25
  • 46
  • Thank you !! Option1 : "monotonicallyIncreasingId()" suggested helped to solve the issue :) – ShaksM Apr 07 '19 at 13:33
0

You could do simply as per below, if sequential ascending aspect is not an issue:

import org.apache.spark.sql.functions.monotonically_increasing_id 
import spark.implicits._

val ds = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0)),5).toDS   // Just a dummy DS

val newds = ds.withColumn("uniqueIdColumn", monotonically_increasing_id())

newds.show(false)

Try it and adapt to your own situation.

BTW: Wrong use of accumulator.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
-1

For this functionality you could use row_number

import org.apache.spark.sql.expressions.Window
import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.row_number;

Dataset<Row> empDatasetWithIds = empDataset.withColumn("Ids", 
    row_number().over(Window.orderBy(col("name"), col("dept"), col("project)))
)

Reference: https://stackoverflow.com/a/31077759

As pointed out in comments using Window without partition is very inefficient. and Should be avoided in production code to process large data.

You approach with the accumulator does not work (as explained in Why does worker node not see updates to accumulator on another worker nodes?) since spark runs this code in different executors (different jvm processes running on different machines), and each one has its own copy if the accumulator.

shanmuga
  • 4,329
  • 2
  • 21
  • 35
  • 2
    I highly recommend not to do that. Indeed, using a window without `partitionBy` will cause all the data to be put into one single partition which does not scale at all (you get this warning in spark `WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.`) – Oli Mar 16 '19 at 15:31
  • @Oli I agree this is inefficient. I would much prefer your solution using `zipWithIndex`. – shanmuga Mar 17 '19 at 11:16