0

My data frame 1:

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
4295876332|^|41|^|40|^|1|^|I|!|
4295876332|^|41|^|110|^|2|^|I|!|
4295876332|^|41|^|111|^|2|^|I|!|
4295876332|^|138|^|139|^|1|^|I|!|
4295876332|^|138|^|193|^|2|^|I|!|
4295877204|^|38|^|37|^|1|^|I|!|
4295877204|^|38|^|103|^|2|^|I|!|
4295877204|^|38|^|104|^|2|^|I|!|
4295877204|^|131|^|132|^|1|^|I|!|
4295877204|^|131|^|178|^|2|^|I|!|
4295877234|^|7|^|100|^|1|^|I|!|
4295877234|^|7|^|137|^|2|^|I|!|
4295877234|^|7|^|138|^|2|^|I|!|
4295877234|^|158|^|188|^|1|^|I|!|
4295877234|^|158|^|210|^|2|^|I|!|
4295877320|^|41|^|40|^|1|^|I|!|
4295877320|^|41|^|107|^|2|^|I|!|
4295877320|^|41|^|108|^|2|^|I|!|
4295877320|^|135|^|136|^|1|^|I|!|
4295877320|^|135|^|190|^|2|^|I|!|
4295877413|^|41|^|40|^|1|^|I|!|
4295877413|^|41|^|108|^|2|^|I|!|
4295877413|^|41|^|109|^|2|^|I|!|
4295877413|^|138|^|139|^|1|^|I|!|
4295877413|^|138|^|190|^|2|^|I|!|
4295877734|^|41|^|40|^|1|^|I|!|
4295877734|^|41|^|121|^|2|^|I|!|
4295877734|^|41|^|122|^|2|^|I|!|
4295877734|^|136|^|137|^|1|^|I|!|
4295877734|^|136|^|188|^|2|^|I|!|
4295878126|^|41|^|40|^|1|^|I|!|
4295878126|^|41|^|106|^|2|^|I|!|
4295878126|^|41|^|107|^|2|^|I|!|
4295878126|^|134|^|135|^|1|^|I|!|
4295878126|^|134|^|181|^|2|^|I|!|
4295880491|^|6|^|172|^|2|^|I|!|
4295880491|^|6|^|173|^|2|^|I|!|
4295880491|^|171|^|174|^|2|^|I|!|
4295876139|^|41|^|40|^|1|^|I|!|
4295876139|^|41|^|122|^|2|^|I|!|
4295876139|^|41|^|123|^|2|^|I|!|
4295876139|^|134|^|135|^|1|^|I|!|
4295876139|^|134|^|188|^|2|^|I|!|
4295876509|^|41|^|40|^|1|^|I|!|
4295876509|^|41|^|118|^|2|^|I|!|
4295876509|^|41|^|119|^|2|^|I|!|
4295876509|^|134|^|135|^|1|^|I|!|
4295876509|^|134|^|185|^|2|^|I|!|
4295876547|^|3|^|100|^|1|^|I|!|
4295876547|^|3|^|130|^|2|^|I|!|
4295876547|^|3|^|131|^|2|^|I|!|
4295876547|^|153|^|185|^|1|^|I|!|
4295876547|^|153|^|202|^|2|^|I|!|
4295876646|^|5|^|104|^|1|^|I|!|
4295876646|^|5|^|150|^|2|^|I|!|
4295876646|^|5|^|151|^|2|^|I|!|
4295876646|^|162|^|195|^|1|^|I|!|
4295876646|^|162|^|217|^|2|^|I|!|
4295876738|^|41|^|40|^|1|^|I|!|
4295876738|^|41|^|106|^|2|^|I|!|
4295876738|^|41|^|107|^|2|^|I|!|
4295876738|^|134|^|135|^|1|^|I|!|
4295876738|^|134|^|187|^|2|^|I|!|
4295877225|^|41|^|40|^|1|^|I|!|
4295877225|^|41|^|122|^|2|^|I|!|
4295877225|^|41|^|123|^|2|^|I|!|
4295877225|^|134|^|135|^|1|^|I|!|
4295877225|^|134|^|188|^|2|^|I|!|
4295877766|^|41|^|40|^|1|^|I|!|
4295877766|^|41|^|106|^|2|^|I|!|
4295877766|^|41|^|107|^|2|^|I|!|
4295877766|^|134|^|135|^|1|^|I|!|
4295877766|^|134|^|186|^|2|^|I|!|
4295877812|^|41|^|40|^|1|^|I|!|
4295877812|^|41|^|112|^|2|^|I|!|
4295877812|^|41|^|113|^|2|^|I|!|
4295877812|^|134|^|135|^|1|^|I|!|
4295877812|^|134|^|186|^|2|^|I|!|
4295877871|^|41|^|40|^|1|^|I|!|
4295877871|^|41|^|124|^|2|^|I|!|
4295877871|^|41|^|125|^|2|^|I|!|
4295877871|^|137|^|138|^|1|^|I|!|
4295877871|^|137|^|190|^|2|^|I|!|
4295877923|^|41|^|40|^|1|^|I|!|
4295877923|^|41|^|122|^|2|^|I|!|
4295877923|^|41|^|123|^|2|^|I|!|
4295877923|^|134|^|135|^|1|^|I|!|
4295877923|^|134|^|188|^|2|^|I|!|
4295877985|^|41|^|40|^|1|^|I|!|
4295877985|^|41|^|113|^|2|^|I|!|
4295877985|^|41|^|114|^|2|^|I|!|
4295877985|^|134|^|135|^|1|^|I|!|
4295877985|^|134|^|188|^|2|^|I|!|
4295878608|^|41|^|40|^|1|^|I|!|
4295878608|^|41|^|105|^|2|^|I|!|
4295878608|^|41|^|106|^|2|^|I|!|
4295878608|^|130|^|131|^|1|^|I|!|
4295878608|^|130|^|182|^|2|^|I|!|
4295878863|^|41|^|40|^|1|^|I|!|
4295878863|^|41|^|121|^|2|^|I|!|
4295878863|^|41|^|122|^|2|^|I|!|
4295878863|^|134|^|135|^|1|^|I|!|
4295878863|^|134|^|187|^|2|^|I|!|
4295880574|^|166|^|167|^|2|^|I|!|
4295880574|^|166|^|168|^|2|^|I|!|
4295880574|^|273|^|274|^|2|^|I|!|
4295876308|^|41|^|40|^|1|^|I|!|
4295876308|^|41|^|103|^|2|^|I|!|
4295876308|^|41|^|104|^|2|^|I|!|
4295876308|^|130|^|131|^|1|^|I|!|
4295876308|^|130|^|177|^|2|^|I|!|

My data frame 2:

DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
SelfSourcedPublic|^|2016|^|1515129638858|^|4295902451|^|109|^|110|^|1|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638859|^|4295902451|^|111|^|112|^|1|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638860|^|4295902451|^|109|^|113|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638861|^|4295902451|^|109|^|114|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638862|^|4295902451|^|111|^|115|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638863|^|4295902451|^|109|^|119|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638864|^|4295902451|^|109|^|120|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638865|^|4295902451|^|111|^|121|^|4|^|O|!|
SelfSourcedPublic|^|2017|^|1515129638866|^|4295902451|^|122|^|126|^|2|^|O|!|
SelfSourcedPublic|^|2017|^|1515129638867|^|4295902451|^|122|^|127|^|2|^|O|!|
SelfSourcedPublic|^|2017|^|1515129639565|^|4295859031|^|126|^|127|^|1|^|I|!|
SelfSourcedPublic|^|2017|^|1515129639566|^|4295859031|^|128|^|129|^|1|^|I|!|
SelfSourcedPublic|^|2017|^|1515129639688|^|4295859031|^|null|^|126|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639689|^|4295859031|^|null|^|127|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639690|^|4295859031|^|null|^|128|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639691|^|4295859031|^|null|^|129|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639713|^|4295906830|^|null|^|420|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639714|^|4295906830|^|null|^|421|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639715|^|4295906830|^|null|^|422|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639741|^|4295906830|^|null|^|420|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639742|^|4295906830|^|null|^|421|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639743|^|4295906830|^|null|^|422|^|null|^|D|!|
SelfSourcedPrivate|^|2014|^|1515129639770|^|4298009288|^|171|^|206|^|2|^|O|!|
SelfSourcedPrivate|^|2014|^|1515129639771|^|4298009288|^|143|^|203|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639809|^|4298009288|^|167|^|168|^|4|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129639810|^|4298009288|^|163|^|195|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639811|^|4298009288|^|163|^|196|^|1|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639812|^|4298009288|^|167|^|197|^|3|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639813|^|4298009288|^|167|^|198|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639814|^|4298009288|^|30|^|29|^|4|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129639815|^|4298009288|^|22|^|73|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639816|^|4298009288|^|22|^|75|^|1|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639817|^|4298009288|^|30|^|76|^|3|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639818|^|4298009288|^|30|^|78|^|2|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640008|^|4298009288|^|163|^|164|^|4|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640009|^|4298009288|^|161|^|191|^|3|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640010|^|4298009288|^|161|^|192|^|2|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640011|^|4298009288|^|161|^|193|^|1|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640012|^|4298009288|^|163|^|194|^|3|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640013|^|4298009288|^|22|^|24|^|4|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640014|^|4298009288|^|19|^|66|^|3|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640015|^|4298009288|^|19|^|68|^|2|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640016|^|4298009288|^|19|^|70|^|1|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640017|^|4298009288|^|22|^|71|^|3|^|O|!|
SelfSourcedPrivate|^|2010|^|1515129640132|^|4298009288|^|155|^|183|^|2|^|O|!|
SelfSourcedPrivate|^|2010|^|1515129640133|^|4298009288|^|10|^|53|^|2|^|O|!|
SelfSourcedPublic|^|2017|^|1515129640204|^|4295904170|^|null|^|379|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640205|^|4295904170|^|null|^|380|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640206|^|4295904170|^|null|^|384|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640313|^|4295904170|^|null|^|379|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640314|^|4295904170|^|null|^|380|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640315|^|4295904170|^|null|^|384|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640528|^|4295904170|^|381|^|379|^|3|^|O|!|
SelfSourcedPublic|^|2017|^|1515129640529|^|4295904170|^|381|^|380|^|3|^|O|!|
SelfSourcedPublic|^|2017|^|1515129640530|^|4295904170|^|381|^|383|^|4|^|I|!|
SelfSourcedPublic|^|2017|^|1515129640531|^|4295904170|^|385|^|384|^|4|^|I|!|
SelfSourcedPublic|^|2017|^|1515129641126|^|4295904170|^|372|^|379|^|3|^|O|!|
SelfSourcedPublic|^|2017|^|1515129641127|^|4295904170|^|372|^|380|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641505|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641506|^|4295858941|^|24|^|25|^|5|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641507|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641508|^|4295858941|^|30|^|31|^|3|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641509|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641510|^|4295858941|^|30|^|32|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641511|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641512|^|4295858941|^|24|^|33|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641513|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641514|^|4295858941|^|24|^|34|^|20|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641515|^|4295858941|^|1|^|2|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641516|^|4295858941|^|1|^|3|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641517|^|4295858941|^|5|^|6|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641518|^|4295858941|^|5|^|7|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641519|^|4295858941|^|12|^|10|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641520|^|4295858941|^|12|^|11|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641521|^|4295858941|^|1|^|13|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641522|^|4295858941|^|12|^|14|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641523|^|4295858941|^|5|^|15|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641524|^|4295858941|^|5|^|16|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641525|^|4295858941|^|1|^|17|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641526|^|4295858941|^|1|^|18|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641527|^|4295858941|^|5|^|19|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641528|^|4295858941|^|5|^|20|^|2|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641529|^|4295858941|^|5|^|21|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641530|^|4295858941|^|1|^|22|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641531|^|4295858941|^|1|^|23|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129641532|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1515129641603|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1515129641604|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1515129641605|^|4295858941|^|null|^|37|^|null|^|D|!|
SelfSourcedPrivate|^|2016|^|1515129641752|^|4298009288|^|232|^|242|^|4|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641753|^|4298009288|^|248|^|249|^|1|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641754|^|4298009288|^|248|^|249|^|1|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641755|^|4298009288|^|230|^|240|^|4|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641756|^|4298009288|^|243|^|247|^|1|^|O|!|
SelfSourcedPrivate|^|2017|^|1515129641757|^|4298009288|^|248|^|252|^|2|^|O|!|
SelfSourcedPrivate|^|2017|^|1515129641758|^|4298009288|^|248|^|255|^|3|^|O|!|
ThirdPartyPrivate|^|2016|^|1515129641866|^|4296803503|^|1|^|2|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1515129642192|^|4295907168|^|367|^|377|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129642193|^|4295907168|^|365|^|375|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129642194|^|4295907168|^|365|^|376|^|4|^|O|!|
Japan|^|2016|^|1515129642733|^|4295876606|^|272|^|278|^|3|^|O|!|
Japan|^|2016|^|1515129642734|^|4295876606|^|272|^|278|^|3|^|O|!|
Japan|^|2016|^|1515129642735|^|4295876606|^|270|^|276|^|2|^|O|!|
Japan|^|2016|^|1515129642736|^|4295876606|^|270|^|277|^|3|^|O|!|
Japan|^|2016|^|1515129642737|^|4295876606|^|270|^|279|^|3|^|O|!|
SelfSourcedPublic|^|2016|^|1515129657602|^|4296803503|^|1|^|2|^|1|^|O|!|

My full working code:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
    
        import org.apache.spark.{ SparkConf, SparkContext }
        import java.sql.{Date, Timestamp}
        import org.apache.spark.sql.Row
        import org.apache.spark.sql.types._

    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("\\.")(4))

val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN")
val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name))

//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR")
val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


 //------------------------------- filtering only the latest from incremental ------------------------------
 
    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")

    val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
      .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
      .drop("tobefiltered", "TimeStamp")

//-----------------separating the incremental df for insert, deletion and overwrite----------------

    //---------------insert rows are selected -------------------------------
    //insert a row if I is detected and if O is found then first delete and then insert
    
    val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

    //------------------deleted rows with primary key  "OrganizationId", "InterimPeriodId"------------------
    // delete rows from parent if both D or O is found in incremental
    val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

    //join by two primary keys for deletion and delete from the parent dataframe
    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|")))

val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)

dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/Interim2Annual/output")

   val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count
  
  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/Interim2Annual/Descr")

My output is two columns, order interchanged:

(`AnnualPeriodId|^|InterimPeriodId`)

My output:

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
4295877812|^|40|^|41|^|1|^|I|!|
4295877234|^|188|^|158|^|1|^|I|!|
4295877320|^|136|^|135|^|1|^|I|!|
4295877225|^|135|^|134|^|1|^|I|!|
4295877766|^|40|^|41|^|1|^|I|!|
4295876332|^|110|^|41|^|2|^|I|!|
4295877812|^|113|^|41|^|2|^|I|!|
4295877320|^|190|^|135|^|2|^|I|!|
4295876308|^|40|^|41|^|1|^|I|!|
4295876646|^|195|^|162|^|1|^|I|!|
4295878608|^|106|^|41|^|2|^|I|!|
4295876738|^|107|^|41|^|2|^|I|!|
4295877812|^|186|^|134|^|2|^|I|!|
4295877734|^|121|^|41|^|2|^|I|!|
4295877413|^|108|^|41|^|2|^|I|!|
4295877766|^|107|^|41|^|2|^|I|!|
4295878608|^|131|^|130|^|1|^|I|!|
4295877985|^|40|^|41|^|1|^|I|!|
4295877923|^|122|^|41|^|2|^|I|!|
4295876308|^|177|^|130|^|2|^|I|!|
4295877413|^|109|^|41|^|2|^|I|!|
4295877225|^|40|^|41|^|1|^|I|!|
4295877413|^|139|^|138|^|1|^|I|!|
4295877766|^|106|^|41|^|2|^|I|!|
4295876308|^|104|^|41|^|2|^|I|!|
4295877204|^|132|^|131|^|1|^|I|!|
4295880574|^|167|^|166|^|2|^|I|!|
4295878126|^|106|^|41|^|2|^|I|!|
4295876509|^|119|^|41|^|2|^|I|!|
4295877734|^|188|^|136|^|2|^|I|!|
4295877923|^|188|^|134|^|2|^|I|!|
4295876139|^|135|^|134|^|1|^|I|!|
4295877413|^|190|^|138|^|2|^|I|!|
4295877225|^|122|^|41|^|2|^|I|!|
4295877812|^|135|^|134|^|1|^|I|!|
4295876646|^|151|^|5|^|2|^|I|!|
4295876139|^|188|^|134|^|2|^|I|!|
4295877225|^|188|^|134|^|2|^|I|!|
4295877234|^|210|^|158|^|2|^|I|!|
4295877923|^|123|^|41|^|2|^|I|!|
4295878863|^|135|^|134|^|1|^|I|!|
4295878863|^|121|^|41|^|2|^|I|!|
4295877234|^|100|^|7|^|1|^|I|!|
4295877812|^|112|^|41|^|2|^|I|!|
4295876332|^|193|^|138|^|2|^|I|!|
4295877225|^|123|^|41|^|2|^|I|!|
4295877320|^|107|^|41|^|2|^|I|!|
4295877734|^|137|^|136|^|1|^|I|!|
4295880574|^|274|^|273|^|2|^|I|!|
4295878608|^|105|^|41|^|2|^|I|!|
4295877320|^|40|^|41|^|1|^|I|!|
4295878608|^|40|^|41|^|1|^|I|!|
4295880491|^|173|^|6|^|2|^|I|!|
4295877985|^|114|^|41|^|2|^|I|!|
4295876646|^|217|^|162|^|2|^|I|!|
4295876738|^|187|^|134|^|2|^|I|!|
4295876509|^|40|^|41|^|1|^|I|!|
4295876139|^|123|^|41|^|2|^|I|!|
4295876509|^|118|^|41|^|2|^|I|!|
4295876646|^|104|^|5|^|1|^|I|!|
4295877234|^|137|^|7|^|2|^|I|!|
4295876547|^|185|^|153|^|1|^|I|!|
4295877734|^|122|^|41|^|2|^|I|!|
4295877766|^|186|^|134|^|2|^|I|!|
4295880574|^|168|^|166|^|2|^|I|!|
4295878126|^|107|^|41|^|2|^|I|!|
4295877234|^|138|^|7|^|2|^|I|!|
4295876738|^|135|^|134|^|1|^|I|!|
4295877766|^|135|^|134|^|1|^|I|!|
4295876646|^|150|^|5|^|2|^|I|!|
4295878126|^|135|^|134|^|1|^|I|!|
4295876139|^|122|^|41|^|2|^|I|!|
4295877204|^|103|^|38|^|2|^|I|!|
4295876332|^|111|^|41|^|2|^|I|!|
4295876332|^|139|^|138|^|1|^|I|!|
4295876308|^|103|^|41|^|2|^|I|!|
4295877734|^|40|^|41|^|1|^|I|!|
4295877871|^|190|^|137|^|2|^|I|!|
4295877923|^|135|^|134|^|1|^|I|!|
4295876547|^|130|^|3|^|2|^|I|!|
4295878863|^|122|^|41|^|2|^|I|!|
4295877204|^|104|^|38|^|2|^|I|!|
4295877985|^|135|^|134|^|1|^|I|!|
4295877871|^|138|^|137|^|1|^|I|!|
4295876332|^|40|^|41|^|1|^|I|!|
4295877871|^|124|^|41|^|2|^|I|!|
4295876139|^|40|^|41|^|1|^|I|!|
4295877204|^|178|^|131|^|2|^|I|!|
4295877413|^|40|^|41|^|1|^|I|!|
4295876509|^|185|^|134|^|2|^|I|!|
4295876308|^|131|^|130|^|1|^|I|!|
4295877871|^|125|^|41|^|2|^|I|!|
4295876738|^|106|^|41|^|2|^|I|!|
4295877923|^|40|^|41|^|1|^|I|!|
4295877985|^|188|^|134|^|2|^|I|!|
4295878126|^|40|^|41|^|1|^|I|!|
4295878863|^|40|^|41|^|1|^|I|!|
4295877204|^|37|^|38|^|1|^|I|!|
4295878608|^|182|^|130|^|2|^|I|!|
4295877320|^|108|^|41|^|2|^|I|!|
4295876547|^|100|^|3|^|1|^|I|!|
4295876547|^|131|^|3|^|2|^|I|!|
4295876547|^|202|^|153|^|2|^|I|!|
4295877871|^|40|^|41|^|1|^|I|!|
4295878863|^|187|^|134|^|2|^|I|!|
4295880491|^|172|^|6|^|2|^|I|!|
4295876738|^|40|^|41|^|1|^|I|!|
4295877985|^|113|^|41|^|2|^|I|!|
4295876509|^|135|^|134|^|1|^|I|!|
4295880491|^|174|^|171|^|2|^|I|!|
4295878126|^|181|^|134|^|2|^|I|!|

For example in DATA FRAME 1 below record is in this order

4295876139|^|134|^|135|^|1|^|I|!|

But in the output i get in this order

4295876139|^|135|^|134|^|1|^|I|!|

This is not if data has I flag .

This is because the this line in my code

val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)

and

val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

and

val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

In the case of Insert or I my column order is:

"OrganizationId","AnnualPeriodId","InterimPeriodId"

In the case of O or D my column order is:

"OrganizationId","InterimPeriodId"

Here is the output that i get where DATA FRAME 1 columns are exchanged .

I hope i am able to explain it clearly .

marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
  • can you please explain how the columns are interchanging with example in input and output as currently you don't have D or O in your input data? – Ramesh Maharjan Jan 05 '18 at 02:48
  • @RameshMaharjan Its not with the INCR data .whaever data is present in data frame 1 that comes out as interchanged ..Even without any D or O. –  Jan 05 '18 at 04:16
  • @RameshMaharjan added all records and output also please have a look –  Jan 05 '18 at 05:34
  • So for Insert i.r `I` data is coming as correct order but in case of `O` and `D` columns are interchanged –  Jan 05 '18 at 08:15
  • please see my answer below :) – Ramesh Maharjan Jan 05 '18 at 10:40

1 Answers1

3

The interchange happened when you joined the main df1resultFinalWithYear with deletedf. You joined with Seq("OrganizationId", "InterimPeriodId") and thus InterimPeriodId came before AnnualPeriodId. But in the insertdf and headerColumn the order is still opposite. So the interchange happened in the following line

val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

You can correct that by ordering the columns as

val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

And your problem should be solved.

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • Any suggestion about this question https://stackoverflow.com/questions/46703623/how-to-rename-spark-data-frame-output-file-in-aws-in-spark-scala –  Jan 11 '18 at 06:25