0

I'm using pyspark with HiveWarehouseConnector in HDP3 cluster. There was a change in the schema so I updated my target table using the "alter table" command and added the new columns to the last positions of it by default. Now I'm trying to use the following code to save spark dataframe to it but the columns in the dataframe have alphabetical order and i'm getting the error message below

df = spark.read.json(df_sub_path)
hive.setDatabase('myDB') 
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode('append').option('table','target_table').save()

and the error message taced to:

Caused by: java.lang.IllegalArgumentException: Hive column: column_x cannot be found at same index: 77 in dataframe. Found column_y. Aborting as this may lead to loading of incorrect data.

Is there any dynamic way of appending the dataframe to correct location in the hive table? It is important as I expect more columns to be added to the target table.

DieDen
  • 157
  • 1
  • 10
  • You can try reading the target table and get the schema. Then you can translate this schema to your dataframe, to ensure schema consistency. – Shadowtrooper Feb 17 '21 at 10:48
  • @Shadowtrooper I'm trying to do something similar to this: target = hive.executeQuery('select * from target_Table where 1=0') test = spark.createDataFrame(source.collect(),target.schema) however, i'm facing a new error that i'm still trying to debug but i'm suspecting that the columns weren't reodered Error message: field speed: ArrayType(LongType,true) can not accept object 80.0 in type The previous column has value of 80.0 with floatType so it is not picking up the schema columns order for targer table – DieDen Feb 17 '21 at 12:13
  • You can try getting the columns and using a select: test = spark.createDataFrame(source.collect()).select(target.columns). Other option is to make a unionbyname with target and test and save. – Shadowtrooper Feb 17 '21 at 12:16
  • see if this answer helps with the approach that you can use : https://stackoverflow.com/questions/54457068/how-to-drop-a-column-from-a-databricks-delta-table/64140452#64140452 – Nikunj Kakadiya Feb 17 '21 at 12:21
  • @Shadowtrooper Yes indeed! I was so concentrated on using the schema i forgot that I can use the magical select :) Now i'm using something like this: test = df.select(target.columns) and then writing to hive table I'd gladly accept your response as an answer :) – DieDen Feb 17 '21 at 12:30

1 Answers1

1

You can read the target column without rows to get the columns. Then, using select, you can order the column correctly and append it:

target = hive.executeQuery('select * from target_Table where 1=0')
test = spark.createDataFrame(source.collect())
test = test.select(target.columns)
Shadowtrooper
  • 1,372
  • 15
  • 28