If you have a database somewhere, you can create a sequence in it, and use it with a user defined function (as you, I stumbled upon this problem...).
Reserve a bucket of sequence numbers, and use it (the incrementby parameter must be the same as the one used to create the sequence). As it's an object, SequenceID will be a singleton on each working node, and you can iterate over the bucket of sequences using the atomiclong.
It's far from being perfect (possible connection leaks, relies on a DB, locks on static class, does), comments welcome.
import java.sql.Connection
import java.sql.DriverManager
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicLong
import org.apache.spark.sql.functions.udf
object SequenceID {
var current: AtomicLong = new AtomicLong
var max: Long = 0
var connection: Connection = null
var connectionLock = new ReentrantLock
var seqLock = new ReentrantLock
def getConnection(): Connection = {
if (connection != null) {
return connection
}
connectionLock.lock()
if (connection == null) {
// create your jdbc connection here
}
connectionLock.unlock()
connection
}
def next(sequence: String, incrementBy: Long): Long = {
if (current.get == max) {
// sequence bucket exhausted, get a new one
seqLock.lock()
if (current.get == max) {
val rs = getConnection().createStatement().executeQuery(s"SELECT NEXT VALUE FOR ${sequence} FROM sysibm.sysdummy1")
rs.next()
current.set(rs.getLong(1))
max = current.get + incrementBy
}
seqLock.unlock()
}
return current.getAndIncrement
}
}
class SequenceID() extends Serializable {
def next(sequence: String, incrementBy: Long): Long = {
return SequenceID.next(sequence, incrementBy)
}
}
val sequenceGenerator = new SequenceID(properties)
def sequenceUDF(seq: SequenceID) = udf[Long](() => {
seq.next("PK_SEQUENCE", 500L)
})
val seq = sequenceUDF(sequenceGenerator)
myDataframe.select(myDataframe("foo"), seq())