This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.
How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.
EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.
With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.
#Call in the input data frame
val inputDF = ...
#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns
#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields.
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns.
#The reason we are passing through the names is because we want to return the base columns.
#Think of a select query
var commandArray = columnArray
#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1
#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length) {
#We are going to take an N element from the columns and build a statement to cast it as a string
commandString = "cast(" + columnArray(columnIterator-1) + " as string)"
#This loop checks if we are not the last element of the column array, if so we add
#in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
if (columnIterator!=columnArray.length) {commandString = commandString + ", "}
#Iterator
columnIterator = columnIterator + 1
}
#I am appending the command we just build to the from of the command array with
#a few extra characters to end the local command and name it something consistent.
#So if we have a DF of Name, Addr, Date; this below statement will look like
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray
#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date)
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date.
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)