-1

I am trying to use the following type of code to load multiple files into my Dataframe.

myProductDF = spark.read.format("csv").option("header","true").option("delimiter", ",").option("escape", "#").load("/mnt/Product/*")

Now it so happens that some of the files have an additional column or two. Ex. ProductTable has ProductTableX.csv with 10 columns and ProductTableY.csv with 11 columns. When I try to load all the files in the Product folder into my dataframe using wildcard * instead of specifying the individual file names, it is skipping the additional column in the files that have this. Since there are 1000's of files to be loaded its not feasible to pick and choose specific flavors. Do let me know if there is a good solution for this scenario. I will be good to go even if it defaults to NULL for the files where the extra column is not available, as long as it loads the extra column into the dataframe with values for the files where it is present.

trotta
  • 1,232
  • 1
  • 16
  • 23
Venkat B
  • 1
  • 2

1 Answers1

0

I have a solution that I use myself to load large amount of inconsistent files. It comes from a code I use for parquet files so there might be some small mistakes in it but it gives you the idea.

Somehow, it might not be optimized as it cycles over all csv files and read the first line to assess the column list.

I use the following functions :

import csv
from pyspark.sql.functions import lit

def get_schemas(path_list):
    """
    Identify schemas from csv files contained in the path list and regroup path with the same schema
    Input : - path_list : list of paths containing csv files
    Output : dictionnary containing one entry by distinct schema, with a list of the path corresponding to each schema
    """
    schemas = {}
    i = 0

    for path in path_list:
        # Get first row (header to identify the schema
        try:
            with open(path, "rb") as f:
                reader = csv.reader(f, delimiter=',', escapechar='#')
                schema = reader.next()

            not_assigned = True

            # If the schema already exists in the dictionnary, append the path to the path list corresponding to this schema
            for k,v in schemas.iteritems():
                if schema == v['schema']:
                    schemas[k]['paths'].append(path)
                    not_assigned = False
                    break

            # If the schema does not exist in the dictionnary, create a new entry
            if not_assigned:
                i = i+1
                schemas[str(i)] = {}
                schemas[str(i)]['schema'] = schema
                schemas[str(i)]['paths'] = []
                schemas[str(i)]['paths'].append(path)
        except:
            logging.info('Impossible to get schema for %s', path)

    return schemas

# Function from https://stackoverflow.com/questions/39758045/how-to-perform-union-on-two-dataframes-with-different-amounts-of-columns-in-spar/42159279#42159279    
def harmonize_schemas_and_combine(df_left, df_right):
    left_types = {f.name: f.dataType for f in df_left.schema}
    right_types = {f.name: f.dataType for f in df_right.schema}
    left_fields = set((f.name, f.dataType, f.nullable) for f in df_left.schema)
    right_fields = set((f.name, f.dataType, f.nullable) for f in df_right.schema)

    # First go over left-unique fields
    for l_name, l_type, l_nullable in left_fields.difference(right_fields):
        if l_name in right_types:
            r_type = left_types[l_name]
            if l_type != r_type:
                raise TypeError, "Union failed. Type conflict on field %s. left type %s, right type %s" % (l_name, l_type, r_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. left nullable %s, right nullable %s"  % (l_name, l_nullable, not(l_nullable))
        df_right = df_right.withColumn(l_name, lit(None).cast(l_type))

    # Now go over right-unique fields
    for r_name, r_type, r_nullable in right_fields.difference(left_fields):
        if r_name in left_types:
            l_type = right_types[r_name]
            if r_type != l_type:
                raise TypeError, "Union failed. Type conflict on field %s. right type %s, left type %s" % (r_name, r_type, l_type)
            else:
                raise TypeError, "Union failed. Nullability conflict on field %s. right nullable %s, left nullable %s" % (r_name, r_nullable, not(r_nullable))
        df_left = df_left.withColumn(r_name, lit(None).cast(r_type))    

    # Make sure columns are in the same order
    df_left = df_left.select(df_right.columns)

    return df_left.union(df_right)

def create_df_path_list(path_list):
    """
    Create a dataframe by concatenating all databases from the path list
    Input : - path_list : list of path corresponding to databases
    Output : concatenated dataframe from all databases from the path list
    """
    # Get all different schemas from path_list
    logging.info('Infering schemas from the path list')
    schemas = get_schemas(path_list)

    logging.info('The path list contains %s different schemas', len(schemas))

    df = sqlContext.createDataFrame(sc.emptyRDD(), pysqlt.StructType([]))
    dict_df = {}
    # Load each set of path with the same schema into separate dataframes
    for k,v in schemas.iteritems():
        logging.info('Reading %s paths with schema %s', len(v['paths']), k)
        dict_df[k] = spark.read.format("csv").option("header","true").option("delimiter", ",").option("escape", "#").load(*v['paths'])

    #Merge all the dataframes
    for k,v in dict_df.iteritems():
        df = harmonize_schemas_and_combine(df, v)

    return df

After that you just have to call the function :

import os

csv_paths = [os.path.join("/mnt/Product/", file) for file in os.listdir("/mnt/Product/") if ".csv" in file]

df = create_df_path_list(csv_paths)

It might not be perfect but I hope it helps !

Xavier Canton
  • 216
  • 2
  • 6