1

I am working with Azure Databricks and PySpark 2.4.3 trying to build a robust approach to file import from Blob storage to a cluster file. Things mostly work but parsing is not raising errors as I expect.

I have a 7GB csv file that I know has a number records with issues that are causing rows to be skipped (found by reconciling the count of read records in the output parquet file written from the Dataframe versus the source CSV.) I am attempting to use the badRecordsPath option and there no output is being generated (that I can find.). Can anyone share advice on how to troubleshoot file loading when there is bad data - and to create a robust process that will handle parsing errors not permissively in the future?

One issue tacked is around embedded newlines where I've found wholeFile and multiline options have helped - but I am now having challenges in getting insight to what records are not being accepted.

The python code that I am using to load the file looks like this.

myDf = spark.read.format("csv")\
.option("inferSchema", "true")\
.option("header", "true")\
.option("wholeFile", "true")\
.option("multiline","true")\
.option("ignoreLeadingWhiteSpace", "true")\
.option("ignoreTrailingWhiteSpace", "true")\
.option("parserLib","univocity")\
.option('quote', '"')\
.option('escape', '"')\
.option("badRecordsPath","/tmp/badRecordsPath")\
.load(BLOB_FILE_LOCATION)

What I see is that about a half million records out of more than 10 million records being dropped. I am currently unable to easily tell which ones or know that failures are occurring or what they are (without exporting and comparing data which would be OK for a one-time load - but not acceptable for the production system). I've also tried the other read modes without luck (it always seems to be behaving like DROPMALFORMED is set which is not the case (even trying mode set "FAILFAST" in an experiment.)

Many thanks for any insight / advice.

rickkoter
  • 11
  • 2
  • It seems, I am having a similar use case. see link below if it helps. https://stackoverflow.com/questions/57918129/parsing-a-csv-file-in-pyspark-using-spark-inbuilt-functions-or-methods – vikrant rana Sep 13 '19 at 11:33
  • 1
    I see - including a step to perform added post load filtering using select for valid data (maybe or maybe not using UDFs). We will have have to explore patterns in the loaded data to look for opportunities to know more certainly what is a good and a bad row based on data. Generically, a validation step should be a part of the pipeline and good/bad data moved to two separate dfs/files. Thanks! – rickkoter Sep 13 '19 at 15:49
  • It has an answer now. We need to load it without a schema and then apply check and validations and then can split the bad and good records in two seperate files. This does not fully address your problem but looks like it has to be done this way. Share your views and thoughts on this. – vikrant rana Sep 18 '19 at 00:43

0 Answers0