0

I got an error in Pyspark:

AnalysisException: u'Resolved attribute(s) week#5230 missing from
longitude#4976,address#4982,minute#4986,azimuth#4977,province#4979,
action_type#4972,user_id#4969,week#2548,month#4989,postcode#4983,location#4981 
in operator !Aggregate [user_id#4969, week#5230], [user_id#4969, 
week#5230, count(distinct day#4987) AS days_per_week#3605L]. 
Attribute(s) with the same name appear in the operation: week. 
Please check if the right attribute(s) are used

This seems to come from a snippet of code where the agg function is used:

df_rs = df_n.groupBy('user_id', 'week')
            .agg(countDistinct('day').alias('days_per_week'))
            .where('days_per_week >= 1')
            .groupBy('user_id')
            .agg(count('week').alias('weeks_per_user'))
            .where('weeks_per_user >= 5').cache()

However I do not see the issue here. And I have previously used this line of code on the same data, many times.

EDIT: I have been looking through the code and the type of error seems to come from joins of this sort:

df = df1.join(df2, 'user_id', 'inner')
df3 = df4.join(df1, 'user_id', 'left_anti).

but still have not solved the problem yet.

EDIT2: Unfortunately the suggested question is not similar to mine, as this is not a question of column name ambiguity but of missing attribute, which seems not to be missing upon inspecting the actual dataframes.

howie
  • 2,587
  • 3
  • 27
  • 43
Qubix
  • 4,161
  • 7
  • 36
  • 73
  • Possible duplicate of [Spark Dataframe distinguish columns with duplicated name](https://stackoverflow.com/questions/33778664/spark-dataframe-distinguish-columns-with-duplicated-name) – user10938362 Mar 15 '19 at 08:57
  • 1
    Can you provide your dataframe schema ? – howie Mar 15 '19 at 10:09

2 Answers2

0

I faced same problem and solved it using renaming the Resolved attributes missing columns to some temp name before join, its a workaround for me , hope it helps you too. Dont know the real reason behind this issue , its still going on since spark 1.6 SPARK-10925

ganesh_patil
  • 356
  • 3
  • 18
0

I also faced this issue multiple times and came across this here it's mentioned that this it's spark related bug. Based on this article I came up with below code which resolved my issue.

The code can handle LEFT, RIGHT, INNER and OUTER Joins, though OUTER join works as FULL OUTER here.

def join_spark_dfs_sqlbased(sparkSession,left_table_sdf,right_table_sdf,common_join_cols_list=[],join_type="LEFT"):
    temp_join_afix="_tempjoincolrenames"
    join_type=join_type.upper()
    
    left=left_table_sdf.select(left_table_sdf.columns)
    right=right_table_sdf.select(right_table_sdf.columns)
    
    
    if len(common_join_cols_list)>0:
        common_join_cols_list=[col+temp_join_afix for col in common_join_cols_list]
    else:
        common_join_cols_list = list(set(left.columns).intersection(right.columns))
        common_join_cols_list=[col+temp_join_afix for col in common_join_cols_list]
        
    
    for col in left.columns:
        left = left.withColumnRenamed(col, col + temp_join_afix)
    left.createOrReplaceTempView('left')
     
    
    for col in right.columns:        
        right = right.withColumnRenamed(col, col + temp_join_afix)               
    right.createOrReplaceTempView('right')
                            
    
    
              
    non_common_cols_left_list=list(set(left.columns)-set(common_join_cols_list))
    non_common_cols_right_list=list(set(right.columns)-set(common_join_cols_list))
    
    
    unidentified_common_cols=list(set(non_common_cols_left_list)-set(non_common_cols_right_list))
    if join_type in ['LEFT','INNER','OUTER']:
        non_common_cols_right_list=list(set(non_common_cols_right_list)-set(unidentified_common_cols))
        common_join_cols_list_with_table=['a.'+col +' as '+col for col in common_join_cols_list]
    else:
        non_common_cols_left_list=list(set(non_common_cols_left_list)-set(unidentified_common_cols))
        common_join_cols_list_with_table=['b.'+col +' as '+col for col in common_join_cols_list]
                
    
    non_common_cols_left_list_with_table=['a.'+col +' as '+col for col in non_common_cols_left_list]        
    non_common_cols_right_list_with_table=['b.'+col +' as '+col for col in non_common_cols_right_list]
    
    non_common_cols_list_with_table=non_common_cols_left_list_with_table + non_common_cols_right_list_with_table    
    
    if join_type=="OUTER":
        join_type="FULL OUTER"
        
    join_type=join_type+" JOIN"
    select_cols=common_join_cols_list_with_table+non_common_cols_list_with_table
    
    common_join_cols_list_with_table_join_query=['a.'+col+ '='+'b.'+col for col in common_join_cols_list]

    query= "SELECT "+ ",".join(select_cols) + " FROM " + "left" + " a " + join_type + " " + "right" + " b" +" ON "+ " AND ".join(common_join_cols_list_with_table_join_query)
    
    print("query:",query)
    
    joined_sdf= sparkSession.sql(query)
        
    for col in joined_sdf.columns:     
        if temp_join_afix in col:   
            joined_sdf = joined_sdf.withColumnRenamed(col, col.replace(temp_join_afix,''))
            
    return joined_sdf
Pranjal Gharat
  • 127
  • 1
  • 4