0

Spark 2.0.0+

I am using built-in csv data source directly to parse some .csv files (e.g. input_file_01.csv, input_file_02.csv etc.) into a spark Dataframe df:

df = spark.read.csv('input_file_*.csv',
         header = True,
         inferSchema = False, # problematic col names trailing apostrophs
         sep=';',
         multiLine = False,
         enforceSchema = True,
         schema = jsonSchema,
         mode='FAILFAST' # debug / DROPMALFORMED
)

They should all have the same schema (jsonSchema).

jsonSchema = StructType([
    StructField("F1", StringType()),
    StructField("F2", StringType()),
    StructField("F3", StringType())
])

Problem:

They should be all the same but some have malicious headers:

parsed_input_file_01.take(3)

>>>[u'"F1","F2","F3"',
    u'"a","b","c"',
    u'"d","e","f"']
parsed_input_file_17.take(3) # malformed file

>>>[u'"`F1`","`F2`","`F3`"', # all columns malformed: '`F1`&#39
    u'"a","b","c"',
    u'"d","e","f"']
parsed_input_file_945.take(3) # malformed file

>>>[u'"F1","F2","`F3`"', # malformed third "column": '`F3`&#39
    u'"a","b","c"',
    u'"d","e","f"']

Traceback:

Py4JJavaError: An error occurred while calling o9210.fromDF.:
org.apache.spark.sql.AnalysisException: cannot resolve '`F1`' given input columns: [F1, F2, F3];;
...

Question:

Given that I do not want to drop data of entire files (by e.g. .option('mode', 'DROPMALFORMED'), if that is even possible like such) I am looking for a performant way of reading all the data by (py)spark.

My approach would be to separately parse just the header of all files and identify malicious files and delete those unwanted &#39 apostrophes or change encoding.

  1. How to identify those wrong files or different encoding in spark?
  2. Is there a performant way of parsing the header first and sanitize wrong files by regex'ing the header in spark?

Conditions:

  • a general option like quote or escaping won't fix the issue as the headers change arbitrarily.
  • the schema should not be inferred as the desired schema (and correct (general) column names and datatypes) are known.
  • all csv files need to be combined in one single dataframe.
hard
  • 103
  • 1
  • 9
  • Provide the schema on read. Read this answer: https://stackoverflow.com/a/34528938/7441537 – André Machado Nov 22 '19 at 18:17
  • 1
    General schema is provided. But _some_ files are not read correctly as the `StructField` name mismatches (because of the unwanted '). See example of file 17 header line. – hard Nov 22 '19 at 18:29

1 Answers1

0

You probably need to make use of RDD here:

dir_path = 'data'
data_rdd = sc.wholeTextFiles(dir_path)
data_flat_map = data_rdd.flatMap(lambda a: a[1].split()[1:])
d = data_flat_map.map(lambda a: a.split(","))

from pyspark.sql.types import *
schema = StructType([StructField("F1", StringType(), True),
    StructField("F2", StringType(), True),
    StructField("F3", StringType(), True)
])
df = spark.createDataFrame(d, schema)

Let me know if this is what you are looking for.

Thanks, Hussain Bohra

Hussain Bohra
  • 985
  • 9
  • 15
  • 1
    The quotes are not the problem. The difference in files is the **arbitrary** insertation of `'` (') in _some_ headers. A general option of `quote` or `escaping`won't fix the issue. Also the schema should not be inferred as I know the desired schema (and correct (general) column names). Also all csv files need to be combined in one single dataframe. – hard Nov 22 '19 at 18:04
  • I see, sorry, I didn't see the problem statement better earlier. It seems you might need to make use of RDD's here. First read your data in RDD and ignore first line (header) and create data frame from the same. I will also update my answer to reflect the same. – Hussain Bohra Nov 22 '19 at 18:59
  • Thanks for your input. Your approach helped me debugging the issue. The Traceback is misleading because there seems to be an encoding issue not in the schema or the data but in the error message itself. The actual underlying bug seems to be the way of how column names are handled. The schema is actually defined by `F.1` `F.2`etc. This causes the issue. Column names are not supposed to contain a `.`. Removing those, does not cause an issue while reading the data. But the schema is still not recognized correctly. I will issue a bug report for spark regarding the dots in column names. – hard Nov 22 '19 at 22:41