I have a solution that I use myself to load large amount of inconsistent files. It comes from a code I use for parquet files so there might be some small mistakes in it but it gives you the idea.
Somehow, it might not be optimized as it cycles over all csv files and read the first line to assess the column list.
I use the following functions :
import csv
from pyspark.sql.functions import lit
def get_schemas(path_list):
"""
Identify schemas from csv files contained in the path list and regroup path with the same schema
Input : - path_list : list of paths containing csv files
Output : dictionnary containing one entry by distinct schema, with a list of the path corresponding to each schema
"""
schemas = {}
i = 0
for path in path_list:
# Get first row (header to identify the schema
try:
with open(path, "rb") as f:
reader = csv.reader(f, delimiter=',', escapechar='#')
schema = reader.next()
not_assigned = True
# If the schema already exists in the dictionnary, append the path to the path list corresponding to this schema
for k,v in schemas.iteritems():
if schema == v['schema']:
schemas[k]['paths'].append(path)
not_assigned = False
break
# If the schema does not exist in the dictionnary, create a new entry
if not_assigned:
i = i+1
schemas[str(i)] = {}
schemas[str(i)]['schema'] = schema
schemas[str(i)]['paths'] = []
schemas[str(i)]['paths'].append(path)
except:
logging.info('Impossible to get schema for %s', path)
return schemas
# Function from https://stackoverflow.com/questions/39758045/how-to-perform-union-on-two-dataframes-with-different-amounts-of-columns-in-spar/42159279#42159279
def harmonize_schemas_and_combine(df_left, df_right):
left_types = {f.name: f.dataType for f in df_left.schema}
right_types = {f.name: f.dataType for f in df_right.schema}
left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)
# First go over left-unique fields
for l_name, l_type, l_nullable in left_fields.difference(right_fields):
if l_name in right_types:
r_type = left_types[l_name]
if l_type != r_type:
raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s" % (l_name, l_nullable, not(l_nullable))
df_right = df_right.withColumn(l_name, lit(None).cast(l_type))
# Now go over right-unique fields
for r_name, r_type, r_nullable in right_fields.difference(left_fields):
if r_name in left_types:
l_type = right_types[r_name]
if r_type != l_type:
raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
else:
raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
df_left = df_left.withColumn(r_name, lit(None).cast(r_type))
# Make sure columns are in the same order
df_left = df_left.select(df_right.columns)
return df_left.union(df_right)
def create_df_path_list(path_list):
"""
Create a dataframe by concatenating all databases from the path list
Input : - path_list : list of path corresponding to databases
Output : concatenated dataframe from all databases from the path list
"""
# Get all different schemas from path_list
logging.info('Infering schemas from the path list')
schemas = get_schemas(path_list)
logging.info('The path list contains %s different schemas', len(schemas))
df = sqlContext.createDataFrame(sc.emptyRDD(), pysqlt.StructType([]))
dict_df = {}
# Load each set of path with the same schema into separate dataframes
for k,v in schemas.iteritems():
logging.info('Reading %s paths with schema %s', len(v['paths']), k)
dict_df[k] = spark.read.format("csv").option("header","true").option("delimiter", ",").option("escape", "#").load(*v['paths'])
#Merge all the dataframes
for k,v in dict_df.iteritems():
df = harmonize_schemas_and_combine(df, v)
return df
After that you just have to call the function :
import os
csv_paths = [os.path.join("/mnt/Product/", file) for file in os.listdir("/mnt/Product/") if ".csv" in file]
df = create_df_path_list(csv_paths)
It might not be perfect but I hope it helps !