0

I have a data frame in pyspark like below.

df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  1|   N|
|  2|   Y|
|  3|   N|
+---+----+

I want to delete a record when there is a duplicate id and the test is N

Now when I query the new_df

new_df.show()
+---+----+
| id|test|
+---+----+
|  1|   Y|
|  2|   Y|
|  3|   N|
+---+----+

I am unable to figure out the use case.

I have done groupby on the id count but it gives only the id column and count.

I have done like below.

grouped_df = new_df.groupBy("id").count()

How can I achieve my desired result

edit

I have a data frame like below.

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   Y|
|4VT1735J00337|                  TV|                   N|
|4VT1735J00337|                  TV|                   Y|
|4VT47B52003EE|              Router|                   N|
|4VT47C5N00A10|               Other|                   N|
+-------------+--------------------+--------------------+

When I done like below

new_df = df.groupBy("sn").agg(max("attribute").alias("attribute"))

I am getting str has no attribute alias error

The expected result should be like below

+-------------+--------------------+--------------------+
|           sn|              device|           attribute|
+-------------+--------------------+--------------------+
|4MY16A5602E0A|       Android Phone|                   N|
|4MY16A5W02DE8|       Android Phone|                   Y|
|4VT1735J00337|                  TV|                   Y|
|4VT47B52003EE|              Router|                   N|
|4VT47C5N00A10|               Other|                   N|
+-------------+--------------------+--------------------+
User12345
  • 5,180
  • 14
  • 58
  • 105

4 Answers4

5

Not the most generic solution but should fit here nicely:

from pyspark.sql.functions import max

df = spark.createDataFrame(
  [(1, "Y"), (1, "N"), (2, "Y"), (3, "N")], ("id", "test")
)

df.groupBy("id").agg(max("test").alias("test")).show()
# +---+----+         
# | id|test|
# +---+----+
# |  1|   Y|
# |  3|   N|
# |  2|   Y|
# +---+----+

More generic one:

from pyspark.sql.functions import col, count, when

test = when(count(when(col("test") == "Y", "Y")) > 0, "Y").otherwise("N")

df.groupBy("id").agg(test.alias("test")).show()
# +---+----+
# | id|test|
# +---+----+
# |  1|   Y|
# |  3|   N|
# |  2|   Y|
# +---+----+

which can be generalized to accommodate more classes and non-trivial ordering, for example if you had three classes Y, ?, N evaluated in this order, you could:

(when(count(when(col("test") == "Y", True)) > 0, "Y")
     .when(count(when(col("test") == "?", True)) > 0, "?")
     .otherwise("N"))

If there are other columns you need to preserve these methods won't work, and you'll need something like shown in Find maximum row per group in Spark DataFrame

Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
3

Another option using row_number:

df.selectExpr(
    '*', 
    'row_number() over (partition by id order by test desc) as rn'
).filter('rn=1 or test="Y"').drop('rn').show()

+---+----+
| id|test|
+---+----+
|  1|   Y|
|  3|   N|
|  2|   Y|
+---+----+

This method doesn't aggregate but only remove duplicated ids when test is N

Psidom
  • 209,562
  • 33
  • 339
  • 356
0

Using Spark SQL temporary tables, I used Databricks Notebook

case class T(id:Int,test:String)
val df=spark.createDataset(Seq(T(1, "Y"), T(1, "N"), T(2, "Y"), T(3, "N")))
df.createOrReplaceTempView("df")
%sql select id, max(test) from df group by id

enter image description here

Krish
  • 724
  • 7
  • 17
0

You can use the below code:

#register as temp table
df.registerTempTable("df")

#create single rows
newDF = sqlc.sql(WITH dfCte AS 
(
    select *,row_number() over (partition by id order by test desc) as RowNumber
    from df
)
select * from dfCte where RowNumber =1)

#drop row numbers and show the newdf
newDF.drop('RowNumber').show()
IamGroot
  • 15
  • 5