1

I am looking for a way to merge two dataframe based on two columns containing ids that could be considered as id1, id2 pair or id2, id1 pair.

I found a similar question to what I am looking for here but it doesn't help me with getting the output I need.

I am working with two dataframes df and temp and each of them have two columns with ids in them:

df:
    time    level_0   level_1
0   60.0    id1       id2
1   420.0   id2       id3
2   120.0   id3       id4
3   336.0   id4       id5

temp:
    number  level_0   level_1  length  width
0   0       id1       id2      25      2
1   1       id4       id5      56      3
2   2       id4       id3      12      7
3   3       id2       id3      750     2

I would like to merge them to get a final dataframe with level_0 and level_1 with time, number, length and width.

As you can see in temp two of the ids are reversed in row 2 and that doesn't allow me to merge both dataframe successfully and I get None values in the selected columns. Also the accepted answer of the post quoted above doesn't seem to work as I get a TypeError: '<' not supported between instances of 'tuple' and 'str' error.

Is there a way of doing a merge based on two columns knowing that some of their values could be in reverse order?

Edit: After reading @SoheilPourbafrani comment, I tried using sets to compare level_0 and level_1 in both dataframes. However, I came across the fact that rows in temp are not in the same order as in df and don't allow me to compare rows between one another.

Here's the desired output for better understanding:

    time    level_0   level_1 number length  width
0   60.0    id1       id2     0      25      2
1   420.0   id2       id3     3      750     2
2   120.0   id3       id4     2      12      7
3   336.0   id4       id5     1      56      3
I.M.
  • 344
  • 3
  • 14
  • Based on my understanding you want to join `df` and `temp` based on the column list: `[level_0, level_1]` and the issue is you only care about their value, no matter in what order they are. If that's the case add a new column to both dataframes (use `withColumn` method) by a function that map columns `level_0` and `level_1` to a unique value, regardress the order. For example if `(id1, id2) -> 1`, so `(id2, id1) -> 1`. Finally join them based on the new column. – Soheil Pourbafrani Jan 21 '21 at 11:59
  • Thank you for your input @SoheilPourbafrani! I tried maping that unique value while using sets but I came across a slight problem. Turns out rows of ```temp``` and ```df``` are not in the same order and I cannot merge the two of them. – I.M. Jan 21 '21 at 13:08
  • Could you add your desire output to the question so I can have a better understanding of it? – Soheil Pourbafrani Jan 21 '21 at 13:27
  • Yes of course, I just did @SoheilPourbafrani. – I.M. Jan 21 '21 at 13:32

3 Answers3

2

As I already mentioned in the comment, you need to add a join column to the DataFrames such that satisfy your business logic and then simply join them.

Spark Dataframe

Here we have a function that gets the value of the level_0 and level_1 columns as input and generates a unique string:

def generateJoinIdCol(col1, col2):
     col1Num = int(col1[-1])
     col2Num = int(col2[-1])
     if col1Num <= col2Num:
             return col1 + ':' + col2
     else:
             return col2 + ':' + col1

Next, we declare the method as Spark UDF function:

from pyspark.sql.types import *
import pyspark.sql.functions as F
udfGenerateJoinIdCol = F.udf(generateJoinIdCol, StringType())

After that we add the new id column to the df and temp:

dfWithIdCol = df.withColumn('id', udfGenerateJoinIdCol('level_0', 'level_1'))
+-----+-------+-------+-------+
| time|level_0|level_1|     id|
+-----+-------+-------+-------+
| 60.0|    id1|    id2|id1:id2|
|420.0|    id2|    id3|id2:id3|
|120.0|    id3|    id4|id3:id4|
|336.0|    id4|    id5|id4:id5|
+-----+-------+-------+-------+

tempWithIdCol = temp.withColumn('id', udfGenerateJoinIdCol('level_0', 'level_1'))
+------+-------+-------+------+-----+-------+
|number|level_0|level_1|length|width|     id|
+------+-------+-------+------+-----+-------+
|     0|    id1|    id2|    25|    2|id1:id2|
|     1|    id4|    id5|    56|    3|id4:id5|
|     2|    id4|    id3|    12|    7|id3:id4|
|     3|    id2|    id3|   750|    2|id2:id3|
+------+-------+-------+------+-----+-------+

Finally, we join them:

res = dfWithIdCol.join(tempWithIdCol.select('id', 'number', 'length', 'width'), 'id')
+-------+-----+-------+-------+------+------+-----+
|     id| time|level_0|level_1|number|length|width|
+-------+-----+-------+-------+------+------+-----+
|id3:id4|120.0|    id3|    id4|     2|    12|    7|
|id4:id5|336.0|    id4|    id5|     1|    56|    3|
|id1:id2| 60.0|    id1|    id2|     0|    25|    2|
|id2:id3|420.0|    id2|    id3|     3|   750|    2|
+-------+-----+-------+-------+------+------+-----+

Update: Pandas Dataframe

I just noticed that by dataframe you meant Pandas dataframe not spark! Here is the Pandas approach:

import pandas as pd

""" initialize dataframes """
# df
dfData = {'time': [60.0, 420.0, 120.0, 336.0], 'level_0': ['id1', 'id2', 'id3', 'id4'], 'level_1': ['id2', 'id3', 'id4', 'id5'] }
df = pd.DataFrame (dfData, columns = ['time','level_0','level_1'])

# temp 
tempData = {'number': [0, 1, 2, 3], 'level_0': ['id1', 'id4', 'id4', 'id2'], 'level_1': ['id2', 'id5', 'id3', 'id3'], 'length': [25, 56, 12, 750], 'width': [2, 3, 7, 2] }
temp = pd.DataFrame (tempData, columns = ['number','level_0','level_1', 'length', 'width'])

Then let's add the id column to both dataframes

# generate id function 
def generateJoinIdCol(col1, col2):
     col1Num = int(col1[-1])
     col2Num = int(col2[-1])
     if col1Num <= col2Num:
             return col1 + ':' + col2
     else:
             return col2 + ':' + col1

# add the id column to df
df['id'] = df.apply(lambda row: generateJoinIdCol(row.level_0, row.level_1), axis = 1)

# df
    time level_0 level_1       id
0   60.0     id1     id2  id1:id2
1  420.0     id2     id3  id2:id3
2  120.0     id3     id4  id3:id4
3  336.0     id4     id5  id4:id5


# add the id column to temp
temp['id'] = temp.apply(lambda row: generateJoinIdCol(row.level_0, row.level_1), axis = 1)

# temp
   number level_0 level_1  length  width       id
0       0     id1     id2      25      2  id1:id2
1       1     id4     id5      56      3  id4:id5
2       2     id4     id3      12      7  id3:id4
3       3     id2     id3     750      2  id2:id3

Finally, we just need to do an inner join on the dataframes

# Join
result = df.merge(temp, how = 'inner', on = 'id')[['time', 'level_0_x', 'level_1_x', 'number', 'length', 'width']]

# rename columns
result.rename(columns={'level_0_x': 'level_0', 'level_1_x': 'level_1'}, inplace=True)

# result
    time level_0 level_1  number  length  width
0   60.0     id1     id2       0      25      2
1  420.0     id2     id3       3     750      2
2  120.0     id3     id4       2      12      7
3  336.0     id4     id5       1      56      3
Soheil Pourbafrani
  • 3,249
  • 3
  • 32
  • 69
1

I created my id in the apply using if else

 df1['id']=""
 df2['id']=""
 df1['id']=df1.apply(lambda row: row.level_1+":"+row.level_0 if row.level_0>row.level_1 else row.level_0+":"+row.level_1, axis=1 )
 df2['id']=df2.apply(lambda row: row.level_1+":"+row.level_0 if row.level_0>row.level_1 else row.level_0+":"+row.level_1, axis=1 )
 final=df1.merge(df2,on=['id'])
Golden Lion
  • 3,840
  • 2
  • 26
  • 35
0

The solution that I am providing is not in python but you can easily replicate that in python. I am doing it in scala/spark but you can get the idea of how you can do it.

//Creating the sample data
val df = Seq((60.0,"id1","id2"),(420.0,"id2","id3"),(120.0,"id3","id4"),(336.0,"id4","id5"))
.toDF("time","level_0","level_1")
val temp = Seq((0,"id1","id2",25,2),(1,"id4","id5",56,3),(2,"id4","id3",12,7),(3,"id2","id3",750,2))
.toDF("number","level_0","level_1","length","width")

//applying transformation to combine the columns based on the condition in specific order
import org.apache.spark.sql.functions._
val df1 = df.withColumn("index0",substring(col("level_0"), 3, 4))
.withColumn("index1",substring(col("level_1"), 3, 4))
.withColumn("level", when($"index0" <= $"index1",concat($"level_0",lit(" "),$"level_1"))
.otherwise(concat($"level_1",lit(" "),$"level_0")))
.drop("index1","index0","level_0","level_1")

val temp1 = temp.withColumn("index0",substring(col("level_0"), 3, 4))
.withColumn("index1",substring(col("level_1"), 3, 4))
.withColumn("level", when($"index0" <= $"index1",concat($"level_0",lit(" "),$"level_1"))
.otherwise(concat($"level_1",lit(" "),$"level_0")))
.drop("index1","index0","level_0","level_1")

//doing the join to get the final required output
val finalDF = df1.join(temp1,Seq("level"),"inner").withColumn("level_0", split($"level"," ")(0))
.withColumn("level_1", split($"level"," ")(1))
.drop("level").orderBy("level_0")
display(finalDF)

You can see the final output as below :

enter image description here

Nikunj Kakadiya
  • 2,689
  • 2
  • 20
  • 35