0

I am facing this same issue while joining two Data frame A, B.

For ex:

c = df_a.join(df_b, [df_a.col1 == df_b.col1], how="left").drop(df_b.col1)

And when I try to drop the duplicate column like as above this query doesn't drop the col1 of df_b. Instead when I try to drop col1 of df_a, then it able to drop the col1 of df_a.

Could anyone please say about this.

Note: I tried the same in my project which has more than 200 columns and shows the same problem. Sometimes this drop function works properly if we have few columns but not if we have more columns.

Drop function not working after left outer join in pyspark

Gaurang Shah
  • 11,764
  • 9
  • 74
  • 137
Sagar patro
  • 115
  • 2
  • 11

2 Answers2

1

function to drop duplicates column after merge.

def dropDupeDfCols(df):
newcols = []
dupcols = []

for i in range(len(df.columns)):
    if df.columns[i] not in newcols:
        newcols.append(df.columns[i])
    else:
        dupcols.append(i)

df = df.toDF(*[str(i) for i in range(len(df.columns))])
for dupcol in dupcols:
    df = df.drop(str(dupcol))

return df.toDF(*newcols) 
aamirmalik124
  • 125
  • 15
0

There are some similar issues I faced recently. Let me show them below with your case.

I am creating two dataframes with the same data

scala> val df_a = Seq((1, 2, "as"), (2,3,"ds"), (3,4,"ew"), (4, 1, "re"), (3,1,"ht")).toDF("a", "b", "c")
df_a: org.apache.spark.sql.DataFrame = [a: int, b: int ... 1 more field]

scala> val df_b = Seq((1, 2, "as"), (2,3,"ds"), (3,4,"ew"), (4, 1, "re"), (3,1,"ht")).toDF("a", "b", "c")
df_b: org.apache.spark.sql.DataFrame = [a: int, b: int ... 1 more field]

Joining them

scala> val df = df_a.join(df_b, df_a("b") === df_b("a"), "leftouter")
df: org.apache.spark.sql.DataFrame = [a: int, b: int ... 4 more fields]

scala> df.show
+---+---+---+---+---+---+
|  a|  b|  c|  a|  b|  c|
+---+---+---+---+---+---+
|  1|  2| as|  2|  3| ds|
|  2|  3| ds|  3|  1| ht|
|  2|  3| ds|  3|  4| ew|
|  3|  4| ew|  4|  1| re|
|  4|  1| re|  1|  2| as|
|  3|  1| ht|  1|  2| as|
+---+---+---+---+---+---+

Let's drop a column that is not present in the above dataframe

+---+---+---+---+---+---+
|  a|  b|  c|  a|  b|  c|
+---+---+---+---+---+---+
|  1|  2| as|  2|  3| ds|
|  2|  3| ds|  3|  1| ht|
|  2|  3| ds|  3|  4| ew|
|  3|  4| ew|  4|  1| re|
|  4|  1| re|  1|  2| as|
|  3|  1| ht|  1|  2| as|
+---+---+---+---+---+---+

Ideally we will expect spark to throw an error, but it executes successfully.

Now, if you drop a column from the above dataframe

scala> df.drop("a").show
+---+---+---+---+
|  b|  c|  b|  c|
+---+---+---+---+
|  2| as|  3| ds|
|  3| ds|  1| ht|
|  3| ds|  4| ew|
|  4| ew|  1| re|
|  1| re|  2| as|
|  1| ht|  2| as|
+---+---+---+---+

It drops all the columns with provided column name in the input dataframe.

If you want to drop specific columns, it should be done as below:

scala> df.drop(df_a("a")).show()
+---+---+---+---+---+
|  b|  c|  a|  b|  c|
+---+---+---+---+---+
|  2| as|  2|  3| ds|
|  3| ds|  3|  1| ht|
|  3| ds|  3|  4| ew|
|  4| ew|  4|  1| re|
|  1| re|  1|  2| as|
|  1| ht|  1|  2| as|
+---+---+---+---+---+

I don't think spark accepts the input as give by you(see below):


scala> df.drop(df_a.a).show()
<console>:30: error: value a is not a member of org.apache.spark.sql.DataFrame
       df.drop(df_a.a).show()
                    ^

scala> df.drop(df_a."a").show()
<console>:1: error: identifier expected but string literal found.
df.drop(df_a."a").show()
             ^

If you provide the input to drop, as below, it executes but will have no impact

scala> df.drop("df_a.a").show
+---+---+---+---+---+---+
|  a|  b|  c|  a|  b|  c|
+---+---+---+---+---+---+
|  1|  2| as|  2|  3| ds|
|  2|  3| ds|  3|  1| ht|
|  2|  3| ds|  3|  4| ew|
|  3|  4| ew|  4|  1| re|
|  4|  1| re|  1|  2| as|
|  3|  1| ht|  1|  2| as|
+---+---+---+---+---+---+

The reason being, spark interprets "df_a.a" as a nested column. As that column is not present ideally it should have thrown error, but as explained above, it just executes.

Hope this helps..!!!

Sarath Chandra Vema
  • 792
  • 1
  • 6
  • 13
  • But when I try to do like drop(df_b.col1) in Pyspark it executed successfully without any affect. But I am not sure in Spark. – Sagar patro Oct 08 '19 at 13:44
  • Can you please confirm that this answer resolved your query ?? – Sarath Chandra Vema Oct 08 '19 at 13:45
  • But as I said, my above query works properly with less columns, since I have more columns Spark doesn't works. – Sagar patro Oct 08 '19 at 13:45
  • Ohh .. I have given code for Scala , in PySpark, code you used works, but as I said in my answer , that if you don't specify, it will remove all the columns – Sarath Chandra Vema Oct 08 '19 at 13:48
  • Yes it removes all columns and also for less columns it works. But for more numbers of columns this fails to drop the column of right side duplicate column. – Sagar patro Oct 08 '19 at 13:50
  • Can you explain it a bit more ?? – Sarath Chandra Vema Oct 08 '19 at 13:52
  • I mean in my present project, I did a left join and after the join I wanted to drop the duplicate column of right side table which played as a foreign key in my join statement. – Sagar patro Oct 08 '19 at 13:54
  • So when I tried to drop that right side column, the above drop command executes successfully but it failed to drop the duplicate column of right side table. Also, if I tried to drop the left side table duplicate column and it worked. – Sagar patro Oct 08 '19 at 13:55
  • okay, then you can mention the columns to drop, as I mentioned in my answer. If you don't need those columns, then while joining you can use ```df.select()``` to select only the columns you need – Sarath Chandra Vema Oct 08 '19 at 13:57
  • But it was hard for me to write all 170 columns actually, finally I did this. – Sagar patro Oct 08 '19 at 13:58