5

I have a file A and B which are exactly the same. I am trying to perform inner and outer joins on these two dataframes. Since I have all the columns as duplicate columns, the existing answers were of no help. The other questions that I have gone through contain a col or two as duplicate, my issue is that the whole files are duplicates of each other: both in data and in column names.

My code:

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql import DataFrameReader, DataFrameWriter
from datetime import datetime

import time

# @params: [JOB_NAME]
args = getResolvedOptions(sys.argv, ['JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

print("All imports were successful.")

df = spark.read.orc(
    's3://****'
)
print("First dataframe read with headers set to True")
df2 = spark.read.orc(
    's3://****'
)
print("Second dataframe read with headers set to True")

# df3 = df.join(df2, ['c_0'], "outer")

# df3 = df.join(
#     df2,
#     df["column_test_1"] == df2["column_1"],
#     "outer"
# )

df3 = df.alias('l').join(df2.alias('r'), on='c_0') #.collect()

print("Dataframes have been joined successfully.")
output_file_path = 's3://****'
)

df3.write.orc(
    output_file_path
)
print("Dataframe has been written to csv.")
job.commit()

The error that I am facing is:

pyspark.sql.utils.AnalysisException: u'Duplicate column(s): "c_4", "c_38", "c_13", "c_27", "c_50", "c_16", "c_23", "c_24", "c_1", "c_35", "c_30", "c_56", "c_34", "c_7", "c_46", "c_49", "c_57", "c_45", "c_31", "c_53", "c_19", "c_25", "c_10", "c_8", "c_14", "c_42", "c_20", "c_47", "c_36", "c_29", "c_15", "c_43", "c_32", "c_5", "c_37", "c_18", "c_54", "c_3", "__created_at__", "c_51", "c_48", "c_9", "c_21", "c_26", "c_44", "c_55", "c_2", "c_17", "c_40", "c_28", "c_33", "c_41", "c_22", "c_11", "c_12", "c_52", "c_6", "c_39" found, cannot save to file.;'
End of LogType:stdout
Aviral Srivastava
  • 4,058
  • 8
  • 29
  • 81
  • And how can I explicitly select the columns? Do you mean to say ```df.select('c_0' as 'df_c_0', 'c_1' as 'df_c_1', .... 'c_49' as 'df_c_49').join(df2.select('c_0' as 'df2_c_0', 'c_1' as 'df2_c_1', .... 'c_49' as 'df2_c_49'))``` ? – Aviral Srivastava Mar 11 '19 at 14:35
  • 1
    Possible duplicate of [Spark Dataframe distinguish columns with duplicated name](https://stackoverflow.com/questions/33778664/spark-dataframe-distinguish-columns-with-duplicated-name) – pault Mar 11 '19 at 14:44
  • the answer is the same. you need to alias the column names. – pault Mar 11 '19 at 14:49
  • No, none of the answers could solve my problem. Yes, it is because of my weakness that I could not extrapolate the aliasing further but asking this question helped me to get to know about `selectExpr` function. Hence, I request you to take back the duplicate remark, it will be helpful for a rookie like me. – Aviral Srivastava Mar 11 '19 at 14:51
  • 1
    My vote to close as a duplicate is just a vote. I still need 4 others (or one gold badge holder) to agree with me, and regardless of the outcome [this doesn't count against you](https://stackoverflow.com/help/duplicates). The solution you are looking for is contained in [this answer](https://stackoverflow.com/a/33779190/5858851). At the bottom, they show how to dynamically rename all the columns. `selectExpr` is not needed (though it's one alternative). If you still feel that this is different, [edit] your question and explain exactly how it's different. – pault Mar 11 '19 at 14:55

3 Answers3

10

There is no shortcut here. Pyspark expects the left and right dataframes to have distinct sets of field names (with the exception of the join key).

One solution would be to prefix each field name with either a "left_" or "right_" as follows:

# Obtain columns lists
left_cols = df.columns
right_cols = df2.columns

# Prefix each dataframe's field with "left_" or "right_"
df = df.selectExpr([col + ' as left_' + col for col in left_cols])
df2 = df2.selectExpr([col + ' as right_' + col for col in right_cols])

# Perform join
df3 = df.alias('l').join(df2.alias('r'), on='c_0')
Greg
  • 1,845
  • 2
  • 16
  • 26
  • 1
    Recommend that you don't use `col` in the `for` loop as it overrides the native PySpark funtion `col` and then it won't be recognised. – happyspace Nov 12 '20 at 23:37
3

Here is a helper function to join two dataframes adding aliases:

def join_with_aliases(left, right, on, how, right_prefix):
    renamed_right = right.selectExpr(
        [
            col + f" as {col}_{right_prefix}"
            for col in df2.columns
            if col not in on
        ]
        + on
    )
    right_on = [f"{x}{right_prefix}" for x in on]
    return left.join(renamed_right, on=on, how=how)

and here an example in how to use it:

df1 = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"]], ("id", "value"))
df2 = spark.createDataFrame([[1, "a"], [2, "b"], [3, "c"]], ("id", "value"))

join_with_aliases(
   left=df1,
   right=df2,
   on=["id"],
   how="inner",
   right_prefix="_right"
).show()

+---+-----+------------+
| id|value|value_right|
+---+-----+------------+
|  1|    a|           a|
|  3|    c|           c|
|  2|    b|           b|
+---+-----+------------+
Gonza Piotti
  • 707
  • 10
  • 10
1

I did something like this but in scala, you can convert the same into pyspark as well...

  • Rename the column names in each dataframe

    dataFrame1.columns.foreach(columnName => {
      dataFrame1 = dataFrame1.select(dataFrame1.columns.head, dataFrame1.columns.tail: _*).withColumnRenamed(columnName, s"left_$columnName")
    })
    
    dataFrame1.columns.foreach(columnName => {
      dataFrame2 = dataFrame2.select(dataFrame2.columns.head, dataFrame2.columns.tail: _*).withColumnRenamed(columnName, s"right_$columnName")
    })
    
  • Now join by mentioning the column names

    resultDF = dataframe1.join(dataframe2, dataframe1("left_c_0") === dataframe2("right_c_0"))
    
Prasad Khode
  • 6,602
  • 11
  • 44
  • 59