1

I'm reading a large number of CSVs from S3 (everything under a key prefix) and creating a strongly-typed Dataset.

val events: DataFrame = cdcFs.getStream()
events
  .withColumn("event", lit("I"))
  .withColumn("source", lit(sourceName))
  .as[TradeRecord]

where TradeRecord is a case class that can normally be deserialized into by SparkSession implicits. However, for a certain batch, a record is failing to deserialize. Here's the error (stack trace omitted)

Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "deal")
- root class: "com.company.trades.TradeRecord"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

deal being a field of TradeRecord that should never be null in source data (S3 objects), so it's not an Option.

Unfortunately the error message doesn't give me any clue as to what the CSV data looks like, or even which CSV file it's coming from. The batch consists of hundreds of files, so I need a way to narrow this down to at most a few files to investigate the issue.

lfk
  • 2,423
  • 6
  • 29
  • 46
  • Possible duplicate of [Spark 2 Dataset Null value exception](https://stackoverflow.com/questions/41665183/spark-2-dataset-null-value-exception) – 10465355 Dec 17 '18 at 00:59
  • Different questions. I understand what the error means. I need to see what the data that is coming in looks like (is it missing a column? is there an extra comma in the csv?), which means I need to know which file contains the broken record. – lfk Dec 17 '18 at 01:12
  • 1
    If you want to narrow things down I'd recommend skipping `as` and using [`input_file_name`](https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.functions$@input_file_name():org.apache.spark.sql.Column) to pinpoint the exact source. – 10465355 Dec 17 '18 at 01:17
  • 3
    Lookup this option called columnNameOfCorruptRecord. It will mark records corrupt records using this columns. You need to add a field to your case class or schema, and set it as the option’s value. I have 1 other idea too using the spark-sql function called from_file which I’ll try to explain later. – emran Dec 17 '18 at 01:18

2 Answers2

1

As suggested by user10465355 you can load the data:

val events: DataFrame = ???

Filter

val mismatched = events.where($"deal".isNull)

Add file name

import org.apache.spark.sql.functions.input_file_name

val tagged = mismatched.withColumn("_file_name", input_file_name)

Optionally add the chunk and chunk and offset:

import org.apache.spark.sql.functions.{spark_partition_id, monotonically_increasing_id, shiftLeft, shiftRight

df
  .withColumn("chunk", spark_partition_id())
  .withColumn(
    "offset",
    monotonically_increasing_id - shiftLeft(shiftRight(monotonically_increasing_id, 33), 33))
0

Here's the solution I came up with (I'm using Spark Structured Streaming):

val stream = spark.readStream
  .format("csv")
  .schema(schema) // a StructType defined elsewhere
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "corruptRecord")
  .load(path)

// If debugging, check for any corrupted CSVs
if (log.isDebugEnabled) { // org.apache.spark.internal.Logging trait 
  import spark.implicits._
  stream
    .filter($"corruptRecord".isNotNull)
    .withColumn("input_file", input_file_name)
    .select($"input_file", $"corruptRecord")
    .writeStream
    .format("console")
    .option("truncate", false)
    .start()
}

val events = stream
  .withColumn("event", lit("I"))
  .withColumn("source", lit(sourceName))
  .as[TradeRecord]

Basically if Spark log level is set to Debug or lower, the DataFrame is checked for corrupted records and any such records are printed out together with their file names. Eventually the program tries to cast this DataFrame to a strongly-typed Dataset[TradeRecord] and fails.

lfk
  • 2,423
  • 6
  • 29
  • 46