0

So I am trying to add column when I find it but I so not want to add when column is not present in the xml schema . This is what I am doing I guess I am doing something wrong in checking the condition .

  val temp = tempNew1
  .withColumn("BookMark", when($"AsReportedItem.fs:BookMark".isNotNull or $"AsReportedItem.fs:BookMark" =!= "", 0))
  .withColumn("DocByteOffset", when($"AsReportedItem.fs:DocByteOffset".isNotNull or $"AsReportedItem.fs:DocByteOffset" =!= "", 0))
  .withColumn("DocByteLength", when($"AsReportedItem.fs:DocByteLength".isNotNull or $"AsReportedItem.fs:DocByteLength" =!= "", 0))
  .withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription".isNotNull or $"AsReportedItem.fs:EditedDescription" =!= "", 0))
  .withColumn("EditedDescription", when($"AsReportedItem.fs:EditedDescription._VALUE".isNotNull or $"AsReportedItem.fs:EditedDescription._VALUE" =!= "", 0))
  .withColumn("EditedDescription_languageId", when($"AsReportedItem.fs:EditedDescription._languageId".isNotNull or $"AsReportedItem.fs:EditedDescription._languageId" =!= "", 0))
  .withColumn("ReportedDescription", when($"AsReportedItem.fs:ReportedDescription._VALUE".isNotNull or $"AsReportedItem.fs:ReportedDescription._VALUE" =!= "", 0))
  .withColumn("ReportedDescription_languageId", when($"AsReportedItem.fs:ReportedDescription._languageId".isNotNull or $"AsReportedItem.fs:ReportedDescription._languageId" =!= "", 0))
  .withColumn("FinancialAsReportedLineItemName_languageId", when($"FinancialAsReportedLineItemName._languageId".isNotNull or $"FinancialAsReportedLineItemName._languageId" =!= "", 0))
  .withColumn("FinancialAsReportedLineItemName", when($"FinancialAsReportedLineItemName._VALUE".isNotNull or $"FinancialAsReportedLineItemName._VALUE" =!= "", 0))
  .withColumn("PeriodPermId_objectTypeId", when($"PeriodPermId._objectTypeId".isNotNull or $"PeriodPermId._objectTypeId" =!= "", 0))
  .withColumn("PeriodPermId", when($"PeriodPermId._VALUE".isNotNull or $"PeriodPermId._VALUE" =!= "", 0))
  .drop($"AsReportedItem").drop($"AsReportedItem")

But when I find column it works fine for me but when column is not present in tempNew1 I get error .

Basically I don't want to do withColumn at all if tag is not found in the schema .

Soothing I am missing here .Please help me identify the issue .

The error I get is below

Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'AsReportedItem.fs:BookMark' given input columns: [IsAsReportedCurrencySetManually,

This is also i have tried

    def hasColumn(df: DataFrame, path: String) = Try(df(path)).isSuccess
 val temp = tempNew1.withColumn("BookMark", when(hasColumn(tempNew1,"AsReportedItem.fs:BookMark") == true, $"AsReportedItem.fs:BookMark"))

But not able to make it work fully ..

This is working but how can I write it for all columns .

val temp = if (hasColumn(tempNew1, "AsReportedItem")) {
      tempNew1
        .withColumn("BookMark", $"AsReportedItem.fs:BookMark")
        .withColumn("DocByteOffset", $"AsReportedItem.fs:DocByteOffset")
        .withColumn("DocByteLength", $"AsReportedItem.fs:DocByteLength")
        .withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription")
        .withColumn("EditedDescription", $"AsReportedItem.fs:EditedDescription._VALUE")
        .withColumn("EditedDescription_languageId", $"AsReportedItem.fs:EditedDescription._languageId")
        .withColumn("ReportedDescription", $"AsReportedItem.fs:ReportedDescription._VALUE")
        .withColumn("ReportedDescription_languageId", $"AsReportedItem.fs:ReportedDescription._languageId")
        .withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
        .withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
        .withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
        .withColumn("PeriodPermId", $"PeriodPermId._VALUE")
        .drop($"AsReportedItem")
    } else {
      tempNew1
        .withColumn("BookMark", lit(null))
        .withColumn("DocByteOffset", lit(null))
        .withColumn("DocByteLength", lit(null))
        .withColumn("EditedDescription", lit(null))
        .withColumn("EditedDescription", lit(null))
        .withColumn("EditedDescription_languageId", lit(null))
        .withColumn("ReportedDescription", lit(null))
        .withColumn("ReportedDescription_languageId", lit(null))
        .withColumn("FinancialAsReportedLineItemName_languageId", $"FinancialAsReportedLineItemName._languageId")
        .withColumn("FinancialAsReportedLineItemName", $"FinancialAsReportedLineItemName._VALUE")
        .withColumn("PeriodPermId_objectTypeId", $"PeriodPermId._objectTypeId")
        .withColumn("PeriodPermId", $"PeriodPermId._VALUE")
        .drop($"AsReportedItem")

    }

Adding schema of the main data frame

root
 |-- DataPartition: string (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- PeriodId: long (nullable = true)
 |-- SourceId: long (nullable = true)
 |-- FinancialStatementLineItem_lineItemId: long (nullable = true)
 |-- FinancialStatementLineItem_lineItemInstanceKey: long (nullable = true)
 |-- StatementCurrencyId: long (nullable = true)
 |-- StatementTypeCode: string (nullable = true)
 |-- uniqueFundamentalSet: long (nullable = true)
 |-- AuditID: string (nullable = true)
 |-- EstimateMethodCode: string (nullable = true)
 |-- EstimateMethodId: long (nullable = true)
 |-- FinancialAsReportedLineItemName: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- FinancialStatementLineItemSequence: long (nullable = true)
 |-- FinancialStatementLineItemValue: double (nullable = true)
 |-- FiscalYear: long (nullable = true)
 |-- IsAnnual: boolean (nullable = true)
 |-- IsAsReportedCurrencySetManually: boolean (nullable = true)
 |-- IsCombinedItem: boolean (nullable = true)
 |-- IsDerived: boolean (nullable = true)
 |-- IsExcludedFromStandardization: boolean (nullable = true)
 |-- IsFinal: boolean (nullable = true)
 |-- IsTotal: boolean (nullable = true)
 |-- PeriodEndDate: string (nullable = true)
 |-- PeriodPermId: struct (nullable = true)
 |    |-- _VALUE: long (nullable = true)
 |    |-- _objectTypeId: long (nullable = true)
 |-- ReportedCurrencyId: long (nullable = true)
 |-- StatementSectionCode: string (nullable = true)
 |-- StatementSectionId: long (nullable = true)
 |-- StatementSectionIsCredit: boolean (nullable = true)
 |-- SystemDerivedTypeCode: string (nullable = true)
 |-- SystemDerivedTypeCodeId: long (nullable = true)
 |-- Unit: double (nullable = true)
 |-- UnitEnumerationId: long (nullable = true)
 |-- FFAction|!|: string (nullable = true)
 |-- PartitionYear: long (nullable = true)
 |-- PartitionStatement: string (nullable = true)

Adding schema after columns appears in the schema

|-- uniqueFundamentalSet: long (nullable = true)
 |-- AsReportedItem: struct (nullable = true)
 |    |-- fs:BookMark: string (nullable = true)
 |    |-- fs:DocByteLength: long (nullable = true)
 |    |-- fs:DocByteOffset: long (nullable = true)
 |    |-- fs:EditedDescription: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _languageId: long (nullable = true)
 |    |-- fs:ItemDisplayedNegativeFlag: boolean (nullable = true)
 |    |-- fs:ItemDisplayedValue: double (nullable = true)
 |    |-- fs:ItemScalingFactor: long (nullable = true)
 |    |-- fs:ReportedDescription: struct (nullable = true)
 |    |    |-- _VALUE: string (nullable = true)
 |    |    |-- _languageId: long (nullable = true)
 |    |-- fs:ReportedValue: double (nullable = true)
 |-- EstimateMethodCode: string (nullable = true)
 |-- EstimateMethodId: long (nullable = true)
 |-- FinancialAsReportedLineItemName: struct (nullable = true)
 |    |-- _VALUE: string (nullable = true)
 |    |-- _languageId: long (nullable = true)
 |-- FinancialLineItemSource: long (nullable = true)
Atharv Thakur
  • 671
  • 3
  • 21
  • 39
  • You can check the `columns` property of the `tempNew1` dataset for the existence of the `AsReportedItem.fs:BookMark` column, and call the `withColumn` conditionally based on the result. See (https://stackoverflow.com/questions/35904136/how-do-i-detect-if-a-spark-dataframe-has-a-column) for more details – Alex Savitsky Apr 23 '18 at 13:24
  • @AlexSavitsky but I have 10 such columns do I have to do it one by one ? – Atharv Thakur Apr 23 '18 at 13:34
  • Yes. You can, however, put your columns in a `Seq`, filter it against the dataset columns, and then fold your dataset using `withColumn`, to make it somewhat functional-style – Alex Savitsky Apr 23 '18 at 14:15
  • @AlexSavitsky I just tried with hasColumn but something is missing ..Can please look at the syntax if you can spare some time .. – Atharv Thakur Apr 23 '18 at 17:03
  • As @AlexSavitsky has pointed you to use foldleft, use the idea but the codes presented will not work. You will have to use his idea to play with struct columns that you have and you shall get it solved ;) – Ramesh Maharjan Apr 24 '18 at 03:03
  • @RameshMaharjan yes it does not work as it is ..I am trying that out... – Atharv Thakur Apr 24 '18 at 04:43
  • @RameshMaharjan cant we do something in simpler way like with hasColumn ? – Atharv Thakur Apr 24 '18 at 05:12
  • you could have done it in simple ways if your table didn't have struct types. but your columns are mostly structs isn't it? – Ramesh Maharjan Apr 24 '18 at 05:15
  • @RameshMaharjan yes ... – Atharv Thakur Apr 24 '18 at 05:17
  • @RameshMaharjan I have updated my code which is actually working but its a very dumb way to make it work,but easy for me to understand .. – Atharv Thakur Apr 24 '18 at 05:30
  • firstly I am not understanding your requirement that you want to populate 0 for not null value or populate with whatever is the value in that struct element? what is your requirement? and secondly do you want to create all those new columns even though you don't have those elements in original struct field? can you clarify those two points – Ramesh Maharjan Apr 24 '18 at 05:36
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/169640/discussion-between-atharv-thakur-and-ramesh-maharjan). – Atharv Thakur Apr 24 '18 at 05:37

2 Answers2

1

Putting it as an answer, as it's getting too big for comments.

Assuming you have a collection of the columns you'd like to add:

val cols = Seq("BookMark")

you would need to repeatedly call withColumn on your original DataFrame, assigning the result to a new DataFrame. There's a functional operation that does just that, called fold:

val result = cols.foldLeft(tempNew1)((df, name) =>
  df.withColumn(name, if (df.column.contains(s"AsReportedItem.fs:$name"))
    col(s"AsReportedItem.fs:$name") else lit("null")))

fold takes the first argument (tempNew1 in your case) and calls the provided function for each element in cols, assigning the result to a new DataFrame each time

Alex Savitsky
  • 2,306
  • 5
  • 24
  • 30
1

I am going to show you general way to apply the logic on AsReportedItem struct column ( I have commented in the code for clarity)

//required column names even though the elements are not present in AsReportedItem struct column
val requiredAsReportedItemColumns = Array("BookMark", "DocByteOffset", "DocByteLength", "EditedDescription", "EditedDescription", "EditedDescription_languageId", "ReportedDescription", "ReportedDescription_languageId")
//selecting the elements of AsReportedItem struct columns for checking condition using when
//checking for structfields inside the selected struct field
def getFields(parent: String, schema: StructType): Seq[String] = schema.fields.flatMap {
  case StructField(name, t: StructType, _, _) => getFields(parent + name + ".", t)
  case StructField(name, _, _, _) => Seq(s"$parent$name")
}

//checking for struct column if present the get the fields of nested structs as well
val AsReportedItemColumns = if(tempNew1.columns.contains("AsReportedItem")) getFields("", tempNew1.select("AsReportedItem.*").schema).toArray.map(x => x.substring(3, x.length)) else Array.empty[String]

//finding difference between required number of columns and the elements present in AsReportedItem struct column
val notInAsReportedItemColumns = requiredAsReportedItemColumns.diff(AsReportedItemColumns.map(x => x.toString.replace(".", "")))

//checking condition for the elements present in AsReportedItem struct column
val temp_for_AsReportedItem = AsReportedItemColumns.foldLeft(tempNew1){(tempdf, name) => tempdf.withColumn(name.replace(".", ""), col(s"AsReportedItem.fs:$name"))}
//populating nulls for the columns that are not present in AsReportedItem struct column
val final_AsReportedItem = notInAsReportedItemColumns.foldLeft(temp_for_AsReportedItem){(tempdf, name)=> tempdf.withColumn(name, lit(null))}.drop("AsReportedItem")

Apply the same logic for the rest of two struct columns FinancialAsReportedLineItemName and PeriodPermId but on the transformed datframe i.e. on final_AsReportedItem and not on tempNew1

Credit to https://stackoverflow.com/a/47104148/5880706

Ramesh Maharjan
  • 41,071
  • 6
  • 69
  • 97
  • Getting error `Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve 'AsReportedItem.*' give input columns 'PartitionYear, EstimateMethodId, EstimateMethodCode` – Atharv Thakur Apr 24 '18 at 06:21
  • I have added that in the question – Atharv Thakur Apr 24 '18 at 06:29
  • Ok so this is not throwing any error ..But I need to explode after getting the columns .. – Atharv Thakur Apr 24 '18 at 06:45
  • I am searching for an example sorry for delay – Atharv Thakur Apr 24 '18 at 07:03
  • So this the issue ...`Exception in thread "main" org.apache.spark.sql.AnalysisException: cannot resolve '(`AsReportedItem`.`fs:EditedDescription` = '')' due to data type mismatch: differing types in '(`AsReportedItem`.`fs:EditedDescription` = '')' (struct<_VALUE:string,_languageId:bigint> and string).;;` because when columns appears the type is different – Atharv Thakur Apr 24 '18 at 14:59
  • Yes I tried removing that earlier but still getting error ..`No such struct field BookMark in fs:BookMark, fs:DocByteLength` – Atharv Thakur Apr 24 '18 at 16:06
  • But this code will not work for `EditedDescription` which is again struct type inside AsReportedItem..I mean I will get that as an array so do I have to explode that ? ..This is what I was taking about earlier and was searching for to generate this scenario – Atharv Thakur Apr 24 '18 at 16:41
  • 1
    you should look at https://stackoverflow.com/questions/47103823/how-can-i-explode-a-struct-in-a-dataframe-without-hard-coding-the-column-names – Ramesh Maharjan Apr 25 '18 at 04:28
  • sorry for the frequent question ..I am going to try this out ..Thanks – Atharv Thakur Apr 25 '18 at 05:03