2

I ran a spark job that takes inputs from two sources, something like: /home/hadoop/base/hourly/{input1/20190701/,input2/20190701/}

The problem is that these two structures have different schema. The situation I have is that the spark job final status is successful, but does not process that data due to the issue. Because of the successful status, this issue went unnoticed in our clusters for a while. Is there a way we can ask spark job to fail instead of bailing out successfully.

Here is a snippet of the error in the task log for reference

Job aborted due to stage failure: Task 1429 in stage 2.0 failed 4 times, most recent failure: Lost task 1429.3 in stage 2.0 (TID 1120, 1.mx.if.aaa.com, executor 64): java.lang.UnsupportedOperationException: parquet.column.values.dictionary.PlainValuesDictionary$PlainIntegerDictionary
    at parquet.column.Dictionary.decodeToLong(Dictionary.java:52)
    at org.apache.spark.sql.execution.datasources.parquet.ParquetDictionary.decodeToLong(ParquetDictionary.java:36)
    at org.apache.spark.sql.execution.vectorized.OnHeapColumnVector.getLong(OnHeapColumnVector.java:364)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:439)

Sample of the code I ran:

val ckall = spark.read.parquet("/home/hadoop/base/hourly/{input1/20190701/,input2/20190701/")
ckall.write.parquet("/home/hadoop/output")

Ideally, i expect the final status of the spark job to be a failure

CSUNNY
  • 414
  • 1
  • 7
  • 23

1 Answers1

0

I had a similar issue only to find out it was all my fault.

Basically, my app starting point looked like this:

object MyApp extends App {

  private val logger = LoggerFactory.getLogger(getClass)
  logger.info(s"Starting $BuildInfo")

  val spark: SparkSession = SparkSession.builder.appName("name").getOrCreate()
  processing(spark)

  spark.stop()
}

And all seems fine. But actually processing(spark) was wrapped in Try and it did not return Unit but Try[Unit]. All executed fine inside, but if an error occurred, it was caught inside and not propagated.

I simply stopped catching the errors and now the app fails like a charm :-).

Atais
  • 10,857
  • 6
  • 71
  • 111