2

I tried taking a schema as a common schema by df.schema() and load all the CSV files to it .But fails as to the assigned schema , the headers of other CSV files doesnot match

Any suggestions would be appreciated. as in a function or spark script

ValaravausBlack
  • 691
  • 5
  • 12

1 Answers1

6

as I understand it. You want to Union / Merge files with different schemas ( though subset of one Master Schema) .. I wrote this function UnionPro which I think just suits your requirement -

EDIT - Added a Pyspark version

def unionPro(DFList: List[DataFrame], caseDiff: String = "Y"): DataFrame = {

    val spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession.active

    /**
     * This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
     * Creates a Unioned DataFrame
     */

    //"This doesn't preserve Order------------------------------------"
    //val MasterColList2 = DFList.map(_.columns.toSet).flatMap(x => x).toSet

    val inputDFList = if (caseDiff == "N")
      DFList
    else {
      DFList.map(df => {

        val cols = df.columns

        val selector = cols.map(x => col(x).alias(x.toLowerCase))

        df.select(selector: _*)

      })
    }

    //"This Preserves Order------------------------------------"
    val masterColStrList: Array[String] = inputDFList.map(x => x.columns).reduce((x, y) => (x.union(y))).distinct

    //val masterColList = ???

    //Create masterSchema ignoring different Datatype & Nullable  in StructField and treating them same based on Name ignoring cases

    val ignoreNullable: StructField => StructField = x => StructField(x.name, x.dataType, true)

    val masterSchema = StructType(inputDFList.map(_.schema.fields.map(ignoreNullable)).reduce((x, y) => (x.union(y))).groupBy(_.name.toLowerCase).map(_._2.head).toArray)

    def unionExpr(myCols: Seq[String], allCols: Seq[String]): Seq[org.apache.spark.sql.Column] = {
      allCols.toList.map(x => x match {
        case x if myCols.contains(x) => col(x)
        case _ => lit(null).as(x)
      })
    }

    // Create EmptyDF

    val masterEmptyDF = spark.createDataFrame(spark.sparkContext.emptyRDD[Row], masterSchema).select(masterColStrList.head, masterColStrList.tail: _*)

    /*
    val df1 = DFList(0)
    val df1cols = df1.columns
    val masterEmptyDF = df1.select(unionExpr(df1cols, MasterColList): _*).where(lit(1) === lit(2))
    val DFColumns: List[Array[Column]] = DFList.map(_.columns).map(unionExpr(_, MasterColList).toArray)
    val unioned_Data = DFList.zip(DFColumns).map(x => x._1.select(x._2: _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))*/

    //For union/unionall Sequence of columns need to be same.. Use unionByName otherwise
    //Passing MasterColStrList to Ensure Columns are in correct order

    inputDFList.map(df => df.select(unionExpr(df.columns, masterColStrList): _*)).foldLeft(masterEmptyDF)((x, y) => x.unionByName(y))
    //inputDFList.map(df => df.select(unionExpr(df.columns, masterColStrList): _*)).foldLeft(masterEmptyDF)((x, y) => x.union(y))

  }

Here is the sample test for it -


    val aDF = Seq(("A", 1), ("B", 2)).toDF("Name", "ID")
    val bDF = Seq(("C", 1), ("D", 2)).toDF("Name", "Sal")
    unionPro(List(aDF, bDF), spark).show

Which gives output as -

+----+----+----+
|Name|  ID| Sal|
+----+----+----+
|   A|   1|null|
|   B|   2|null|
|   C|null|   1|
|   D|null|   2|
+----+----+----+

Here's Pyspark version of it -

def unionPro(DFList: List[DataFrame], caseDiff: str = "N") -> DataFrame:
    """
    :param DFList:
    :param caseDiff:
    :return:
    This Function Accepts DataFrame with same or Different Schema/Column Order.With some or none common columns
    Creates a Unioned DataFrame
    """
    inputDFList = DFList if caseDiff == "N" else [df.select([F.col(x.lower) for x in df.columns]) for df in DFList]

    # "This Preserves Order ( OrderedDict0-----------------------------------"
    from collections import OrderedDict
    ## As columnNames ( String) are hashable
    masterColStrList = list(OrderedDict.fromkeys(reduce(lambda x, y: x + y, [df.columns for df in inputDFList])))

    # Create masterSchema ignoring different Datatype & Nullable  in StructField and treating them same based on Name ignoring cases
    ignoreNullable = lambda x: StructField(x.name, x.dataType, True)

    import itertools

    
    # to get reliable results by groupby iterable must be sorted by grouping key
    # in sorted function key function( lambda) must be passed as named argument ( keyword argument)
    # but by Sorting now, I lost original order of columns. Hence I'll use masterColStrList while returning final DF
    masterSchema = StructType([list(y)[0] for x, y in itertools.groupby(
        sorted(reduce(lambda x, y: x + y, [[ignoreNullable(x) for x in df.schema.fields] for df in inputDFList]),
               key=lambda x: x.name),
        lambda x: x.name)])

    def unionExpr(myCols: List[str], allCols: List[str]) -> List[Column]:
        return [F.col(x) if x in myCols else F.lit(None).alias(x) for x in allCols]

    # Create Empty Dataframe
    masterEmptyDF = spark.createDataFrame([], masterSchema)

    return reduce(lambda x, y: x.unionByName(y),
                  [df.select(unionExpr(df.columns, masterColStrList)) for df in inputDFList], masterEmptyDF).select(
        masterColStrList)

ValaravausBlack
  • 691
  • 5
  • 12
  • could you write it in PySpark as well somehow? Would be appreciated! I am especially curious about the `masterSchema` part. – Anna Taylor Aug 26 '20 at 14:26
  • 1
    @AnnaTaylor Added a Pyspark version..there is some stuff irrelevant to core idea around param - caseDiff .. that is something to handle different case column names I had encountered while working. you can ignore it... I used itertools and had to use sorted to get desired results with it.. I mentioned in comments.. – ValaravausBlack Aug 29 '20 at 10:53
  • 1
    Could you include your imports for the pyspark version so people don't have to hunt down every one if they just want to try this out to see if it works? Btw resulted in: `Union can only be performed on tables with the compatible column types` :) – oneirois Jan 15 '21 at 01:50
  • Tested it with 'val bDF = Seq(("C", 1), ("D", 2)).toDF("NAME", "Sal")' and got distinct 'Name' and 'NAME' columns. I was expecting them to map to a single "NAME' column. – mike scholtes May 04 '21 at 21:34
  • @mikescholtes , It can take in one more parameter, which tells me there can be case differences in column Names.. I've edited the pyspark & scala code for same – ValaravausBlack Dec 23 '21 at 15:48