1

I was trying to concatenate two dataframes side by side. And I saw this. On the description for the monotonically_increasing_id() it says:

"monotonically_increasing_id() - Returns monotonically increasing 64-bit integers. The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the lower 33 bits represent the record number within each partition. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records. The function is non-deterministic because its result depends on partition IDs."

I'm trying to understand how we are presuming that monotonically_increasing_id() produces the same results for both of these dataframes to join on since it is non-deterministic. If it produces different row_numbers for these dataframes, then they wouldn't join. The 'result depends on partition IDs' part might be the answer, but I don't understand this. Can someone explain?

user2816215
  • 399
  • 1
  • 4
  • 17

2 Answers2

1

This is the best way that I've found so far to add an index to a dataframe df:

new_columns = df.columns + ["row_idx"]

# Adding row index
df = df\
    .rdd\
    .zipWithIndex()\
    .map(lambda(row, rowindex): row + (rowindex,)).toDF()

# Renaming all the columns
df = df.toDF(*new_columns)

It does have the overhead of converting to rdd and then back to the dataframe. However, monotonically_increasing_id() is non-deterministic and row_number() requires a Window, which may not be ideal unless used with PARTITION BY, otherwise it shuffles all the data to one partition, defeating the purpose of pyspark.

So, to add a list as a new column in a dataframe, simply convert the list to a dataframe

new_df = spark.createDataFrame([(l,) for l in lst], ['new_col'])

and add row_number to it like above. Then join,

joined_df = df.join(new_df, ['row_idx'], 'inner')
user2816215
  • 399
  • 1
  • 4
  • 17
0

This is from my experience. monotonically_increasing_id() has some gnarliness to it. For small use cases you will always get a generically increasing ID. However if you are having complex shuffles or data usage issues, it can and will not increase at the same value each tick. By this I mean DF1 went from 1->~100000000 however during a reshuffle DF2 was recalculated again from Spark lazy implementation it went from 1->~48000000 then 48000001.23->100000000.23. This meant I lost a ton of rows.

How I solved the problem was through unique Row_IDs. To do this I had a function called Row_Hash, below, it would go through and build a unique row ID at the front of the column. Not matter how many shuffles or data writes there were I maintained the uniqueness of my join conditions.

EDIT: What I am going to do is turn all elements of the metadata of the data frame into arrays. The reason for this is that you can specify what elements of an array you want to query. This is different from a data frame, because of shuffle and repartitions, calling a take(n) might give different results however calling a array(n) will always output the same results.

With this in mind, lets return to the problem we need to create a local row identifier where there is none. To do this we are fully concatenating the rows (this is for scenarios where there are no row-keys), calling an MD5 on top of the product (yes there is a chance of intersection but it is exceedingly low). This will yield a large string character for each row, making it separate from the rest of the system, allowing the user to use it as a unique row-join key.

#Call in the input data frame
val inputDF = ...

#Returns a array of string on the columns of input dataframe
val columnArray = inputDF.columns

#In Scala a variable allows us to dynamically augment and update the value
#This is the start of the command where we are concatenating all fields and running and MD5, we just need to add in the other fields. 
var commandString = "md5(concat("
#This will be a set of string of actions we want Spark to run on our columns. 
#The reason we are passing through the names is because we want to return the base columns. 
#Think of a select query
var commandArray = columnArray

#This is an iterator where we are going to move 1->n, n being the last element of the number of columns
var columnIterator = 1

#Run while there are still columns we have not acted upon.
while(columnIterator<=columnArray.length) {

  #We are going to take an N element from the columns and build a statement to cast it as a string 
  commandString = "cast(" + columnArray(columnIterator-1) + " as string)"

  #This loop checks if we are not the last element of the column array, if so we add 
  #in a comma this allows us to have N many element be concatenated (I add the space because it is aesthetically pleasing)
  if (columnIterator!=columnArray.length) {commandString = commandString + ", "}
  #Iterator
  columnIterator = columnIterator + 1
}

#I am appending the command we just build to the from of the command array with 
#a few extra characters to end the local command and name it something consistent. 
#So if we have a DF of Name, Addr, Date; this below statement will look like 
#Array("md5(concat(cast(Name as string), cast(Addr as string), cast(Date as string)) as Row_Hash","Name","Addr","Date")
val commandArray = Array(commandString + ")) as Row_Hash") ++ commandArray

#Select Expr runs independent strings through a standard SQL framework (kinda little bit of column A, column B)
#Each string is its own element so based on the above example DF 
#inputDF.selectExpr("Name", "length(Addr) as Addr_Length", "Addr", "Date) 
#Will output a DF with four elements Name, an integer of the length of column Addr, Addr, and Date. 
#In the previous lines of code we have build out those strings into the command array
#The usage of commandArray:_* means we want spark to run all elements of Array through the select statement.
val finalDF = inputDF.selectExpr(commandArray:_*)
afeldman
  • 492
  • 2
  • 10