I ran a spark job that takes inputs from two sources, something like: /home/hadoop/base/hourly/{input1/20190701/,input2/20190701/}
The problem is that these two structures have different schema. The situation I have is that the spark job final status is successful, but does not process that data due to the issue. Because of the successful status, this issue went unnoticed in our clusters for a while. Is there a way we can ask spark job to fail instead of bailing out successfully.
Here is a snippet of the error in the task log for reference
Job aborted due to stage failure: Task 1429 in stage 2.0 failed 4 times, most recent failure: Lost task 1429.3 in stage 2.0 (TID 1120, 1.mx.if.aaa.com, executor 64): java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
at parquet.column.Dictionary.decodeToLong(Dictionary.java:52)
at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:36)
at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:364)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)
Sample of the code I ran:
val ckall = spark.read.parquet("/home/hadoop/base/hourly/{input1/20190701/,input2/20190701/")
ckall.write.parquet("/home/hadoop/output")
Ideally, i expect the final status of the spark job to be a failure