1

I have a below schema

root
 |-- DataPartition: long (nullable = true)
 |-- TimeStamp: string (nullable = true)
 |-- _action: string (nullable = true)
 |-- env:Data: struct (nullable = true)
 |    |-- _type: string (nullable = true)
 |    |-- al:FundamentalAnalytic: struct (nullable = true)
 |    |    |-- _analyticItemInstanceKey: long (nullable = true)
 |    |    |-- _financialPeriodEndDate: string (nullable = true)
 |    |    |-- _financialPeriodType: string (nullable = true)
 |    |    |-- _isYearToDate: boolean (nullable = true)
 |    |    |-- _lineItemId: long (nullable = true)
 |    |    |-- al:AnalyticConceptCode: string (nullable = true)
 |    |    |-- al:AnalyticConceptId: long (nullable = true)
 |    |    |-- al:AnalyticIsEstimated: boolean (nullable = true)
 |    |    |-- al:AnalyticValue: struct (nullable = true)
 |    |    |    |-- _VALUE: double (nullable = true)
 |    |    |    |-- _currencyId: long (nullable = true)
 |    |    |-- al:AuditID: string (nullable = true)
 |    |    |-- al:FinancialPeriodTypeId: long (nullable = true)
 |    |    |-- al:FundamentalSeriesId: struct (nullable = true)
 |    |    |    |-- _VALUE: long (nullable = true)
 |    |    |    |-- _objectType: string (nullable = true)
 |    |    |    |-- _objectTypeId: long (nullable = true)
 |    |    |-- al:InstrumentId: long (nullable = true)
 |    |    |-- al:IsAnnual: boolean (nullable = true)
 |    |    |-- al:TaxonomyId: long (nullable = true)

Now this is a xml files which varies frequently . I want to process only tax which contains env:Data.sr:Source.* For that I have written below code

val dfType = dfContentItem.
    select(getDataPartition($"DataPartition").
        as("DataPartition"), 
        $"TimeStamp".as("TimeStamp"), 
        $"env:Data.sr:Source.*", 
        getFFActionParent($"_action")
        .as("FFAction|!|")
    ).filter($"env:Data.sr:Source._organizationId".isNotNull)
dfType.show(false)

But this works only when sr:Source is found in schema else I get below exception

Exception in thread "main" org.apache.spark.sql.AnalysisException: No such struct field sr:Source in _type, cr:TRFCoraxData, fun:Fundamental, md:Identifier, md:Relationship;

To ignore that I have null checkfor sr:Source ,but that is not working for me . For that check also I am getting same error .

Basically what i need is that is env:Data.sr:Source.* is null then i want to exit processing and next tag processing will start again .

Anupam
  • 284
  • 5
  • 21

1 Answers1

0

The org.apache.spark.sql.AnalysisException is usually thrown when something's wrong in the query - so I'm pretty sure it's because you're trying to filter a null on those occasions

Error handling in scala is typically done with an Option there's a good article on it Try

def handleNulls(organizationId: String): Option[Boolean] = {
     val orgId = Option(organizationId).getOrElse(return None)
     Some()
}
val betterNullsUdf = udf[Option[Boolean], Integer](handleNulls)

val dfType = dfContentItem.
    select(getDataPartition($"DataPartition").
        as("DataPartition"), 
        $"TimeStamp".as("TimeStamp"), 
        betterNullsUdf($"env:Data.sr:Source.*"), 
        getFFActionParent($"_action")
        .as("FFAction|!|")
    ).filter($"env:Data.sr:Source._organizationId".isNotNull)
dfType.show(false)
Steven Black
  • 1,988
  • 1
  • 15
  • 25
  • I am getting `not enough arguments for method apply: (x: A)Some[A] in object Some. Unspecified value parameter x.` at some ..Also i have env:Data.sr:Source.* as array type – Anupam Apr 10 '18 at 04:23
  • Also Steven in my case of env:Data.sr:Source.* is null then i want to exit processing the job for that tag .I dont think this can handle that – Anupam Apr 10 '18 at 04:25