4

below is the logic to add sequence number column in dataframe. Its working as expected when I am reading data from delimited files. Today I have a new task to read the data from oracle table and add Sequence number and process further. I am facing issue with below logic to add sequence number in data frame when I read it from oracle table.

oracleTableDF is my dataframe

   //creating Sequence no. logic for SeqNum
   val rowRDD = oracleTableDF.rdd.zipWithIndex().map(indexedRow => Row.fromSeq((((indexedRow._2.toLong+1)).toLong) +: indexedRow._1.toSeq)) 

  //creating StructType to add Seqnum in schema
        val newstructure = StructType(Array(StructField("SeqNum",LongType)).++(oracleTableDF.schema.fields))

  //creating new Data Frame with seqnum
  oracleTableDF = spark.createDataFrame(rowRDD, newstructure)

I am not able to locate the actual Issue. because the logic is working as expected in cluster when I read it from files. but facing some issue when I read it from oracle table. its working as expected in local mode also.

below is the error :

"ERROR scheduler.TaskSetManager: Task 0 in stage 1.0 failed 4 times; aborting job org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 4, xxxx, executor 1): java.lang.NoClassDefFoundError: Could not initialize class oracleDataProcess$"

Pravinkumar Hadpad
  • 97
  • 1
  • 1
  • 10

2 Answers2

6

If all you need is to add a column to your dataframe with an auto-increment integer value, you can use monotonicallyIncreasingId which is of LongType:

val oracleTableDF2 = oracleTableDF.withColumn("SeqNum", monotonicallyIncreasingId)

[UPDATE]

Note that monotonicallyIncreasingId is deprecated. monotonically_increasing_id() should be used instead.

Leo C
  • 22,006
  • 3
  • 26
  • 39
  • As per suggestion on "https://stackoverflow.com/questions/35705038/how-do-i-add-an-persistent-column-of-row-ids-to-spark-dataframe" its not a clear function. and its does not take any parameters. I need to pass some parameters on this. – Pravinkumar Hadpad Sep 11 '17 at 07:50
  • I believe issue [SPARK-14393](https://issues.apache.org/jira/browse/SPARK-14393) has already been resolved since `Spark 2.1`. You're right that `monotonicallyIncreasingId` doesn't take parameter. – Leo C Sep 11 '17 at 16:48
  • @LeoC, is there any way to get the values from 1?, now it is printing from 0. – ROOT Feb 07 '18 at 05:55
  • @user4342532, monotonically_increasing_id() guarantees only generating `Long`s in a monotonically increasing fashion. It may start from 0 or some `Long` integer. If you want to have control in the id assignment, you should consider using [zipWithIndex](http://stdatalabs.blogspot.com/2016/09/assigning-row-number-in-spark-using.html) on a RDD, or `row_number()` Window function as illustrated in part of this [blog post](https://hadoopist.wordpress.com/2016/05/24/generate-unique-ids-for-each-rows-in-a-spark-dataframe/). – Leo C Feb 07 '18 at 06:33
  • @LeoC, Thank you – ROOT Feb 07 '18 at 06:34
  • @LeoC, when i use monotonicallyIncreasingId, its prints like 0,8589934592,17179869184,25769803776.... – ROOT Mar 06 '18 at 13:12
  • 1
    @user4342532, unfortunately monotonically_increasing_id() only guarantees the generated numbers are monotonically increasing. There is no guarantee they will be consecutive integers. – Leo C Mar 06 '18 at 16:10
0

One option you can use is monotonically_increasing_id() to create a new column with an incremental id

val dataFrame = oracleTableDF.withColumn("incremental_id", monotonically_increasing_id())
Prasad Khode
  • 6,602
  • 11
  • 44
  • 59