1

We have requirement where we want to set counter value based on the input record:

 for e.g.
    record 1 ->    {subtradeid:100,amount:1000}
    record 2 ->    {subtradeid:100,amount:8000}
    record 3 ->    {subtradeid:200,amount:3000}
    record 4 ->    {subtradeid:300,amount:2000}
    record 5 ->    {subtradeid:300,amount:2000}
    record 6 ->    {subtradeid:500,amount:8000}
    record 7 ->    {subtradeid:500,amount:8000}
    record 8 ->    {subtradeid:500,amount:8000}

So Whenever executor processes these records, it should create counter field for each of these records which should be unique for each subtradeid.

So for all records which have subtradeid=500, it should set new field (for e.g portfolioid) which is contiguous and it should not be duplicated irrespective of executor processing that record. So if executor1 processes the record with subtradeid=500, set the portfolioid=1 and if executor2 processes the next record with subtradeid=500, then it should set the portfolioid=2 (i.e. the next counter value).

user3811946
  • 123
  • 2
  • 11

1 Answers1

0

If I understand correctly, you want to add an extra counter column to a dataset for each record within a same subtradeid group. In batch, this can be done simply using window functions:

import org.apache.spark.sql.expressions.Window
val df = List((1,100,1000),(2,100,8000),(3,200,3000),(4,300,2000),(5,300,2000),(6,500,8000),(7,500,8000),(8,500,8000)).toDF("record","subtradeid","amount")
val w = Window.partitionBy(col("subtradeid")).orderBy(col("record"))

val res = df.withColumn("portfolioid",row_number().over(w))

which contains

+------+----------+------+-----------+
|record|subtradeid|amount|portfolioid|
+------+----------+------+-----------+
|     4|       300|  2000|          1|
|     5|       300|  2000|          2|
|     6|       500|  8000|          1|
|     7|       500|  8000|          2|
|     8|       500|  8000|          3|
|     1|       100|  1000|          1|
|     2|       100|  8000|          2|
|     3|       200|  3000|          1|
+------+----------+------+-----------+

edit: rank -> row_number to lift assumption that the record field is a principal key

Wilmerton
  • 1,448
  • 1
  • 12
  • 31