1

I am trying to union two Spark dataframes with different set of columns. For this purpose, I referred to following link :-

How to perform union on two DataFrames with different amounts of columns in spark?

My code is as follows -

val cols1 = finalDF.columns.toSet
val cols2 = df.columns.toSet
val total = cols1 ++ cols2 
finalDF=finalDF.select(expr(cols1, total):_*).unionAll(df.select(expr(cols2, total):_*))

def expr(myCols: Set[String], allCols: Set[String]) = {
  allCols.toList.map(x => x match {
    case x if myCols.contains(x) => col(x)
    case _ => lit(null).as(x)
  })
}

But the problem I am facing is some of the columns in both dataframes are nested. I've columns of both StructType and primitive types. Now, say column A (of StructType) is in df and not in finalDF. But in expr,

case _ => lit(null).as(x)

is not making it StructType. That's why I am not able to union them. It is giving me following error -

org.apache.spark.sql.AnalysisException: Union can only be performed on tables with the compatible column types. NullType <> StructType(StructField(_VALUE,StringType,true), StructField(_id,LongType,true)) at the first column of the second table.

Any suggestions what I can do here ?

zero323
  • 322,348
  • 103
  • 959
  • 935
Ishan
  • 996
  • 3
  • 13
  • 34
  • please check this stackoverflow [thread](https://stackoverflow.com/questions/42530431/spark-union-fails-with-nested-json-dataframe). – jose praveen Jul 30 '17 at 10:48
  • @NinjaDev82 Yes, I can import all the files at the same time and its working. But I need to add a column with a value extracted from header of the same file (I'm importing an xml file into spark dataframe). So, I am importing the data into spark dataframe, extracting out header from it and adding that value to a new column in that df. This header value is different for each df. – Ishan Jul 30 '17 at 11:08

2 Answers2

2

I'd use built-in schema inference for this. It is way more expensive, but much simpler than matching complex structures, with possible conflicts:

spark.read.json(df1.toJSON.union(df2.toJSON))

You can also import all files at the same time, and join with information extracted from header, using input_file_name.

import org.apache.spark.sql.function

val metadata: DataFrame  // Just metadata from the header
val data: DataFrame      // All files loaded together

metadata.withColumn("file", input_file_name)
  .join(data.withColumn("file", input_file_name), Seq("file"))
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
0
df = df1.join(df2, ['each', 'shared', 'column'], how='full')

will fill missing data with nulls.

ehacinom
  • 8,070
  • 7
  • 43
  • 65