Hi I have 2 dataframes to join
#df1
name genre count
satya drama 1
satya action 3
abc drame 2
abc comedy 2
def romance 1
#df2
name max_count
satya 3
abc 2
def 1
Now I want to join above 2 dfs on name and count==max_count, But i am getting an error
import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
from pyspark.sql.functions import struct
df = spark.read.csv('file',sep = '###', header=True)
df1 = df.groupBy("name", "genre").count()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count))
final_df.show() ###Error
#py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
#Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: count(1)
at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)
But success with "left " join
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count), "left")
final_df.show() ###Success but i don't want left join , i want inner join
My question is why the above one fails, am I doing something wrong there???
I referred this link "Find maximum row per group in Spark DataFrame". Used the first answer (2 groupby method).But same error.
I am on spark-2.0.0-bin-hadoop2.7 and python 2.7.
Please suggest.Thanks.
Edit:
The above scenario works with spark 1.6 (which is quite surprising that what's wrong with spark 2.0 (or with my installation , I will reinstall, check and update here)).
Has anybody tried this on spark 2.0 and got success , by following Yaron's answer below???