0

Hi I have 2 dataframes to join

#df1
 name    genre  count
 satya   drama    1
 satya   action   3
 abc     drame    2
 abc     comedy   2
 def     romance  1

#df2
 name  max_count
 satya  3
 abc    2
 def    1

Now I want to join above 2 dfs on name and count==max_count, But i am getting an error

import pyspark.sql.functions as F
from pyspark.sql.functions import count, col
from pyspark.sql.functions import struct
df = spark.read.csv('file',sep = '###', header=True)
df1 = df.groupBy("name", "genre").count()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count))
final_df.show() ###Error
#py4j.protocol.Py4JJavaError: An error occurred while calling o207.showString.
: org.apache.spark.SparkException: Exception thrown in awaitResult:
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:194)
#Caused by: java.lang.UnsupportedOperationException: Cannot evaluate expression: count(1)
    at org.apache.spark.sql.catalyst.expressions.Unevaluable$class.doGenCode(Expression.scala:224)

But success with "left " join

final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count), "left")
final_df.show()  ###Success but i don't want left join , i want inner join

My question is why the above one fails, am I doing something wrong there???

I referred this link "Find maximum row per group in Spark DataFrame". Used the first answer (2 groupby method).But same error.

I am on spark-2.0.0-bin-hadoop2.7 and python 2.7.

Please suggest.Thanks.

Edit:

The above scenario works with spark 1.6 (which is quite surprising that what's wrong with spark 2.0 (or with my installation , I will reinstall, check and update here)).

Has anybody tried this on spark 2.0 and got success , by following Yaron's answer below???

Community
  • 1
  • 1
Satya
  • 5,470
  • 17
  • 47
  • 72
  • just a guess.....would column names be conflicting with dataframe methods? E.g. `count`. Don't know why that would affect only inner join though. You could try to rename `count` to `cnt` or something just to rule out that possibility. – RedBaron Sep 22 '16 at 06:55
  • @RedBaron- Alredy tried that.Same error. – Satya Sep 22 '16 at 06:59

3 Answers3

3

I ran into the same problem when I tried to join two DataFrames where one of them was GroupedData. It worked for me when I cached the GroupedData DataFrame before the inner join. For your code, try:

df1 = df.groupBy("name", "genre").count().cache()    # added cache()
df2 = df1.groupby('name').agg(F.max("count").alias("max_count")).cache()   # added cache()
final_df = df1.join(df2, (df1.name == df2.name) & (df1.count == df2.max_count))    # no change
  • @Johann-yes it worked, but not able to understand why!!!. Can you please explain a little about , why it worked and why the Not-cached version was not. – Satya Oct 04 '16 at 05:28
  • 1
    @Satya My understanding is that given Spark's lazy evaluation mechanism, if we don't cache df1 and df2 before joining them, Spark will go through all the steps to create df1 and df2 dynamically and separately when the join command is issued. Given the error code "Cannot evaluate expression: count(1)", it seems like Spark is stuck in the loop of finding the value of count multiple times. – user6891234 Oct 05 '16 at 17:21
2

Update: It seems like your code was failing also due to the use of "count" as column name. count seems to be protected keyword in DataFrame API. renaming count to "mycount" solved the problem. The below working code was modify to support spark version 1.5.2 which I used to test your issue.

df = sqlContext.read.format("com.databricks.spark.csv").option("header", "true").load("/tmp/fac_cal.csv")
df1 = df.groupBy("name", "genre").count()
df1 = df1.select(col("name"),col("genre"),col("count").alias("mycount"))
df2 = df1.groupby('name').agg(F.max("mycount").alias("max_count"))
df2 = df2.select(col('name').alias('name2'),col("max_count"))
#Now trying to join both dataframes
final_df = df1.join(df2,[df1.name == df2.name2 , df1.mycount == df2.max_count])
final_df.show()

+-----+---------+-------+-----+---------+
| name|    genre|mycount|name2|max_count|
+-----+---------+-------+-----+---------+
|brata|   comedy|      2|brata|        2|
|brata|    drama|      2|brata|        2|
|panda|adventure|      1|panda|        1|
|panda|  romance|      1|panda|        1|
|satya|   action|      3|satya|        3|
+-----+---------+-------+-----+---------+

The example for complex condition in https://spark.apache.org/docs/2.0.0/api/python/pyspark.sql.html

cond = [df.name == df3.name, df.age == df3.age]
>>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]

can you try:

final_df = df1.join(df2, [df1.name == df2.name , df1.mycount == df2.max_count])

Note also, that according to the spec "left" is not part of the valid join types: how – str, default ‘inner’. One of inner, outer, left_outer, right_outer, leftsemi.

Yaron
  • 10,166
  • 9
  • 45
  • 65
  • Hi Yaron-Still it is not working. And 'left' is working fine on my system. I have tried renaming columns in both dfs as (df1.columns==name,genre,cnt),(df2.columns==name,cnt) and tried f = df1.join(df2,['name','cnt']) #got error and with f = df1.join(df2,['name','cnt'],'left') ##success... – Satya Sep 22 '16 at 10:25
  • And in your first example , have you tried a inner join?? IF it is working with your system, then something fishy with my spark version. – Satya Sep 22 '16 at 10:25
  • @Satya can you please share with us input file which reproduce the issue you see? (e.g. the 'file' you read in spark.read.csv('file'... ) ) – Yaron Sep 22 '16 at 11:59
  • @Yaron-sorry I am not able to(The file is actuallya huge file >10Gb). But I tried same thing by manual dataframe creation with the sample inputs . data = spark.createDataFrame([('satya', 'action'), ('satya', 'action'), ('satya', 'drama'), ('satya', 'action'), ('brata', 'comedy'), ('brata', 'comedy'), ('panda', 'romance'), ('panda', 'adventure'), ('brata', 'drama'), ('brata', 'drama')], ['name', 'genre']) – Satya Sep 22 '16 at 16:10
  • I am facing same issue with any dataframe/s (fromfile(csv, json..etc) or manual i/ps ) while trying to do an "INNER"-join with multiple equality conditions. But not while doing 'Any Outer Join'. – Satya Sep 22 '16 at 16:14
  • Can you please share simple example file which reproduce the problem? – Yaron Sep 22 '16 at 16:23
  • Hi, check this link http://www.filedropper.com/favcal without any sep you have to read it. – Satya Sep 22 '16 at 16:28
  • @Satya - I used the csv file you provided, and managed to understand the root cause of the problem (which was using "count" as column name) - see details in my updated answer – Yaron Sep 25 '16 at 06:20
  • Hi Yaron- Tried your recent code snippet. unfortunately I am facing same issue/Error this time also. Attached Screenshots of my attempt( http://www.filedropper.com/innererror, http://www.filedropper.com/leftsuccess). I don't understand why on inner join I am facing error while left join giving me a result. Now I have started to believe there might some issue with Spark version installed on my system.(Spark 2-0). I have tried this on 2 diff machine with Spark 2.0 installed and facing same error. Anyway on which version of spark you have tried the above code?? – Satya Sep 25 '16 at 15:19
  • @Yaron- I checked the code on Spark 1.6 and it is giving expected result. Have you checked your code on spark 2.0, or do you have any idea why the above code is not working on spark 2.0? If you checked spark 2.0 then I might have some issue with my spark installation(though pretty sure installation is fine). I will reinstall everything and check. – Satya Sep 26 '16 at 04:34
  • @Satya - I didn't check my code on spark 2.0 (I wrote :"The below working code was modify to support spark version 1.5.2 which I used to test your issue"). I think that you need to do some minor modification in order to cause spark 1.5.2/16 code to run on Spark 2.0 – Yaron Sep 26 '16 at 05:18
  • @Satya - I suggest adding some debug checks (df1.show(), df2.show(), etc) it might help you identify, the source of the "null" in the JPG you provided – Yaron Sep 26 '16 at 05:22
  • @Yaron-I will check for running spark1.5 or 1.6 code on 2.0. And that null is obvious because my 2nd join condition fails there(df1.mycount == df2.max_count i:e 1==2) and i did a left join .. so null is there in the result. – Satya Sep 26 '16 at 05:30
0

My work-around in spark 2.0

I created a single column('combined') from columns in join comparision('name','mycount')in respective dfs, so now I have one column to compare and this is not creating any issue as I am comparing only one column.

def combine_func(*args):
  data = '_'.join([str(x) for x in args]) ###converting nonstring to str tehn concatenation
  return data
combine_func = udf(combine_func, StringType())  ##register the func as udf
df1 = df1.withColumn('combined_new_1', combine_new(df1['name'],df1['mycount']))  ###a col having concatenated value from name and mycount columns eg: 'satya_3'
df2 = df2.withColumn('combined_new_2', combine_new(df2['name2'],df2['max_count']))
#df1.columns == 'name','genre', 'mycount', 'combined_new_1'
#df2.columns == 'name2', 'max_count', 'combined_new_2'
#Now join 
final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner')
#final_df = df1.join(df2,df1.combined_new_1 == df2.combined_new_2, 'inner').select('the columns you want')
final_df.show()  ####It is showing the result, Trust me.

Please don't follow until unless you are in a hurry, Better search for a reliable solution.

Community
  • 1
  • 1
Satya
  • 5,470
  • 17
  • 47
  • 72