2

I use spark v1.6. I have the below dataframe.

Primary_key | Dim_id

PK1 | 1

PK2 | 2

PK3 | 3

I would like to create a new dataframe with a new sequence #s whenever a new record comes in. Lets say, I get 2 new records from the source with values PK4 & PK5, I would like to create new dim_ids with the values 4 and 5. So, my new dataframe should look like below.

Primary_key | Dim_id

PK1 | 1

PK2 | 2

PK3 | 3

PK4 | 4

PK5 | 5

How to generate a running sequence number in spark dataframe v1.6 for the new records?

Prasan
  • 111
  • 1
  • 2
  • 4
  • What do you mean by two new records come in ? Is it related to streaming ? – Shivansh Nov 29 '16 at 06:11
  • You could write a function and populate the second column yourself (say some hashing function on the PK that comes in). – rohitkulky Nov 29 '16 at 06:33
  • We read records from a source file and load them into the Target. While loading into Target, we create new dim_ids for the new records. Also PK1/PK2 are sample values and not real values. – Prasan Nov 29 '16 at 15:04
  • Possible duplicate of [How to implement auto increment in spark SQL(PySpark)](https://stackoverflow.com/questions/40231328/how-to-implement-auto-increment-in-spark-sqlpyspark) – mrsrinivas Oct 15 '17 at 09:01

1 Answers1

0

If you have a database somewhere, you can create a sequence in it, and use it with a user defined function (as you, I stumbled upon this problem...).

Reserve a bucket of sequence numbers, and use it (the incrementby parameter must be the same as the one used to create the sequence). As it's an object, SequenceID will be a singleton on each working node, and you can iterate over the bucket of sequences using the atomiclong.

It's far from being perfect (possible connection leaks, relies on a DB, locks on static class, does), comments welcome.

import java.sql.Connection
import java.sql.DriverManager
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.sql.functions.udf

object SequenceID {
  var current: AtomicLong = new AtomicLong
  var max: Long = 0
  var connection: Connection = null
  var connectionLock = new ReentrantLock
  var seqLock = new ReentrantLock
  def getConnection(): Connection = {
    if (connection != null) {
      return connection
    }
    connectionLock.lock()
    if (connection == null) {
     // create your jdbc connection here
    }
    connectionLock.unlock()
    connection
  }

  def next(sequence: String, incrementBy: Long): Long = {
    if (current.get == max) {
      // sequence bucket exhausted, get a new one
      seqLock.lock()
      if (current.get == max) {
        val rs = getConnection().createStatement().executeQuery(s"SELECT NEXT VALUE FOR ${sequence} FROM sysibm.sysdummy1")
        rs.next()
        current.set(rs.getLong(1))
        max = current.get + incrementBy
      }
      seqLock.unlock()
    }
    return current.getAndIncrement
  }
}

class SequenceID() extends Serializable {
  def next(sequence: String, incrementBy: Long): Long = {
    return SequenceID.next(sequence, incrementBy)
  }  
}


val sequenceGenerator = new SequenceID(properties)

def sequenceUDF(seq: SequenceID) = udf[Long](() => {
  seq.next("PK_SEQUENCE", 500L)
})

val seq = sequenceUDF(sequenceGenerator)

myDataframe.select(myDataframe("foo"), seq())
fylb
  • 679
  • 4
  • 10