1

I am new to spark and I have around 10000 rows in a datafile to read

SparkSession sessionSpark = SparkSession
            .builder()
            .config(sparkConf)
            .getOrCreate();


Dataset<Row> dataset = sessionSpark.read.parquet("s3://databucket/files/")

I have a usecase to add a row number to every row in the dataset , the row number should start from 1 to 10000 (since the file has 10000 records ) , is it possible to assign a row number and we know spark shuffles the data but let's say even after rerunning the same file twice from the application , the row number generated should be the same , is it possible to do ?

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
CrazyWolf
  • 37
  • 5
  • If you use ```monotonically_increasing_id ``` function & it has some drawbacks & how to fix that .. check this post - https://stackoverflow.com/questions/48209667/using-monotonically-increasing-id-for-assigning-row-number-to-pyspark-datafram – Srinivas May 28 '20 at 14:52
  • i will post a fuller answer but in scala – thebluephantom May 28 '20 at 16:27

3 Answers3

2

monotonically_increasing_id() would add incremental id for your rows

import org.apache.spark.sql.functions._
Dataset<Row> dataset = sessionSpark.read.parquet("s3://databucket/files/").withColumn("rowNum", monotonically_increasing_id())

From official Spark Docs

A column expression that generates monotonically increasing 64-bit integers.

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.

As an example, consider a DataFrame with two partitions, each with 3 records. This expression would return the following IDs:

{{{ 0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594. }}}

QuickSilver
  • 3,915
  • 2
  • 13
  • 29
  • Are you 100% sure this will keep the IDs always the same? Like ID 1 will always be record X? – Saša Zejnilović May 28 '20 at 13:50
  • So what you quote, it means that if I change the number of partitions I get different IDs? I am myself interested in this because it is a use case I have, and I used hashes to be 100% sure it is the same. – Saša Zejnilović May 28 '20 at 13:57
  • no the quote means that the id will be a unique no. for each row in any Dataframe/Dataset. It just the unique set of ids are allocated across partitions in-order to make compute faster. – QuickSilver May 28 '20 at 14:01
  • Cool. I will use it personally and you deserve a +1, but I think this does not do what was asked. It is not 1 to 1000, consecutive. It is "random". – Saša Zejnilović May 28 '20 at 14:12
  • Annoyed with the -1 vote I got which is indeed zipWithIndex that answers your question. – thebluephantom May 28 '20 at 16:11
1

EDIT: A solution that complies to the ID being consecutive and starting at 1

If you can order them by something, it should be possible. This example might be scala, but the main part is still the SQL part.

val df = sc.parallelize(Seq(("alfa", 10), ("beta", 20), ("gama", 5))).toDF("word", "count")
df.createOrReplaceTempView("wordcount")

// MAIN PART
val tmpTable = spark.sqlContext.sql("select row_number() over (order by count) as index,word,count from wordcount")

tmpTable.show()

+-----+----+-----+
|index|word|count|
+-----+----+-----+
|    1|gama|    5|
|    2|alfa|   10|
|    3|beta|   20|
+-----+----+-----+

EDIT: If you don't need plain numbers, go with a hash of row. It is better.

  • 1
    Without a partition by clause won't this pull all of the data to a single partition which may cause performance problems? I solved that particular problem whn using dense_rank by addition a dummy partitionBy(lit(0)) clause to my WindowSpec – Terry Dactyl May 28 '20 at 15:26
0

Not in Java as I do not specialize in that, but in Scala. Should be easy enough to convert for you. Just an example I have using DS with case classes:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.{Encoder, Encoders}
import spark.implicits._

// Gen some example data via DF, can come from files, ordering in those files assumed. I.e. no need to sort.
val df = Seq(
  ("1 February"), ("n"), ("c"), ("b"), 
  ("2 February"), ("hh"), ("www"), ("e"), 
  ("3 February"), ("y"), ("s"), ("j"),
  ("1 March"), ("c"), ("b"), ("x"),
  ("1 March"), ("c"), ("b"), ("x"),
  ("2 March"), ("c"), ("b"), ("x"),
  ("3 March"), ("c"), ("b"), ("x"), ("y"), ("z")
           ).toDF("line")

// Define Case Classes to avoid Row aspects on df --> rdd --> to DF. 
case class X(line: String)   
case class Xtra(key: Long, line: String)

// Add the Seq Num using zipWithIndex. Then convert back, but will have a struct to deal wit.
// You can avoid the struct if using Row and such. But general idea should be clear.
val rdd = df.as[X].rdd.zipWithIndex().map{case (v,k) => (k,v)}
val ds = rdd.toDF("key", "line").as[Xtra]
ds.show(100,false)

returns:

+---+------------+
|key|line        |
+---+------------+
|0  |[1 February]|
|1  |[n]         |  
|2  |[c]         |
...

The answers to-date do not meet the needs as question provide but if only 10K rows then the single partition is not an issue. Although for 10K rows one has to ask a few questions.

If you don't mind Row, here is another approach:

import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField,StructType,IntegerType, ArrayType, LongType}

val df = sc.parallelize(Seq((1.0, 2.0), (0.0, -1.0), (3.0, 4.0), (6.0, -2.3))).toDF("x", "y")
val newSchema = StructType(df.schema.fields ++ Array(StructField("rowid", LongType, false)))

val rddWithId = df.rdd.zipWithIndex
val dfZippedWithId =  spark.createDataFrame(rddWithId.map{ case (row, index) => Row.fromSeq(row.toSeq ++ Array(index))}, newSchema)
thebluephantom
  • 16,458
  • 8
  • 40
  • 83