5

I operate with Spark 1.5, using Java. I need to append ID/Index column to existing DataFrame, for example:

+---------+--------+
|  surname|    name|
+---------+--------+
|    Green|    Jake|
| Anderson|  Thomas|
| Corleone| Michael|
|    Marsh|   Randy|
|  Montana|    Tony|
|    Green|   Julia|
|Brenneman|    Eady|
|   Durden|   Tyler|
| Corleone|    Vito|
|   Madiro|     Mat|
+---------+--------+

I want every row to be appended with index, in range between between 1 and table records amount. Index order does not matter, any row must just contain unique ID/index. It could be done by transformation into RDD and appending index row and transformation into DataFrame with modified StructType, but, If I understand correctly, this operation consumes a lot of resources for transformation etc., and there must be another way. Result must be like:

+---------+--------+---+
|  surname|    name| id|
+---------+--------+---+
|    Green|    Jake|  3|
| Anderson|  Thomas|  5|
| Corleone| Michael|  2|
|    Marsh|   Randy| 10|
|  Montana|    Tony|  7|
|    Green|   Julia|  1|
|Brenneman|    Eady|  2|
|   Durden|   Tyler|  9|
| Corleone|    Vito|  4|
|   Madiro|     Mat|  6|
+---------+--------+---+

Thank you.

Sergey Repnikov
  • 55
  • 1
  • 2
  • 8
  • 1
    Possible duplicate of [Primary keys with Apache Spark](http://stackoverflow.com/questions/33102727/primary-keys-with-apache-spark) – zero323 Aug 10 '16 at 14:02
  • First solution he propose (If I understood Scala syntax correctly) is conversion into RDD etc. Second - I can't call this function in Java, and It generates unique values not from required range, so the only possible solution is using hash functions, but It has unacceptable drawbacks. – Sergey Repnikov Aug 10 '16 at 14:09
  • 2
    Actually my point here is that given your requirements there is no better solution than rdd -> zipWithIndex. Also excluding Python snippets every piece of code there should be Java compatible. – zero323 Aug 10 '16 at 14:37

5 Answers5

3

I know this question might be a while ago, but you can do it as follow:

from pyspark.sql.window import Window  
w = Window.orderBy("myColumn") 
withIndexDF = originalDF.withColumn("index", row_number().over(w))
  • myColumn: Any specific column from your dataframe.
  • originalDF: original DataFrame withouth the index column.
dbustosp
  • 4,208
  • 25
  • 46
  • While using window without partition clause there will be warning about all data falling into single partition, possible huge performance degradation. – Dmitry Feb 13 '20 at 07:58
1

The most concise way to do this in spark data frame:

.withColumn("idx",monotonically_increasing_id())

Complete documentation: https://docs.databricks.com/spark/latest/sparkr/functions/withColumn.html

Algorithman
  • 1,309
  • 1
  • 16
  • 39
  • 3
    From the question: > I want every row to be appended with index, in range between between 1 and table records amount. From the code for `monotonically_increasing_id()`: > The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. – Caleb Fenton Nov 14 '17 at 01:35
0

Folks, a good approach at :

DataFrame-ified zipWithIndex

simulating the ZipWithIndex method from RDD ... the first suggestion performs better but no big deal with the pure Dataframes solution so far (over 100M lines table in my scenario).

Community
  • 1
  • 1
Mário de Sá Vera
  • 380
  • 1
  • 4
  • 12
0

In Scala, first we need to create an indexing Array:

val indx_arr=(1 to your_df.count.toInt).toArray

indx_arr: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

Now, we want to append this column to our Dataframe. First, we open up our Dataframe and get it as an array, then we zip it with our indx_arr and then we convert the newly-created array back into and RDD. The final step is to get it as a Dataframe:

final_df = sc.parallelize((your_df.collect.map(
    x=>(x(0),x(1))) zip indx_arr).map(
    x=>(x._1._1.toString,x._1._2.toString,x._2))).toDF("surname","name","id")

This is also an easy and straightforward method of appending an array of any kind to our Spark Dataframe.

tuomastik
  • 4,559
  • 5
  • 36
  • 48
Mahdi Ghelichi
  • 1,090
  • 14
  • 23
-2

You can use withColumn function. Usage should be something like Val myDF = existingDF.withColumn("index",express(random(1,existingDF.count())