1

I have multiple files as shown below. My task is to read all those files, merge them and create one final dataframe. However, one file (Measurement_table_sep_13th.csv) has to be summarized before being used for merge. It is too huge, so we summarize it and then merge it.

filenames = sorted(glob.glob('*.csv'))
filenames   # gives the below output

enter image description here

filenames = sorted(glob.glob('*.csv'))
for f in filenames:
   print(f)
   if f == 'Measurement_table_sep_13th.csv':
       df = spark.read.csv(f, sep=",",inferSchema=True, header=True)
       df = df.groupby("person_id","visit_occurrence_id").pivot("measurement_concept_id").agg(F.mean(F.col("value_as_number")), F.min(F.col("value_as_number")), F.max(F.col("value_as_number")),
                                            F.count(F.col("value_as_number")),F.stddev(F.col("value_as_number")),
                                            F.expr('percentile_approx(value_as_number, 0.25)').alias("25_pc"),
                                            F.expr('percentile_approx(value_as_number, 0.75)').alias("75_pc"))
   else:
       df = spark.read.csv(f, sep=",",inferSchema=True, header=True)

   try:
      JKeys = ['person_id', 'visit_occurrence_id'] if 'visit_occurrence_id' in df.columns else ['person_id']
      print(JKeys)
      df_final = df_final.join(df, on=JKeys, how='left')
      print("success in try")
   except:
      df_final = df
      print("success in except")

As you can see, I am summarizing Measurement_table_sep_13th.csv file before merging, but is there any other elegant and efficient way to write this?

The Great
  • 7,215
  • 7
  • 40
  • 128

1 Answers1

1

If you do not want to save the one file in a different folder, you can also exlude it directly with glob:

followed by this post: glob exclude pattern

files = glob.glob('files_path/[!_]*')

You can use this to run a glob function for all the files except your measurement file and then join it.

then you can avoid the long if-code.

It would look like (followed by this post: Loading multiple csv files of a folder into one dataframe):

files = glob.glob("[!M]*.csv")
dfs = [pd.read_csv(f, header=True, sep=";", inferShema=True) for f in files]

df2 = pd.concat(dfs,ignore_index=True)
df = spark.read.csv(f, sep=",",inferSchema=True, header=True)
df = df.groupby("person_id","visit_occurrence_id").pivot("measurement_concept_id").agg(F.mean(F.col("value_as_number")), F.min(F.col("value_as_number")), F.max(F.col("value_as_number")),
                                            F.count(F.col("value_as_number")),F.stddev(F.col("value_as_number")),
                                            F.expr('percentile_approx(value_as_number, 0.25)').alias("25_pc"),
                                            F.expr('percentile_approx(value_as_number, 0.75)').alias("75_pc"))
JKeys = ['person_id', 'visit_occurrence_id'] if 'visit_occurrence_id' in df.columns else ['person_id']
df_final = df(df2, on=JKeys, how='left')
PV8
  • 5,799
  • 7
  • 43
  • 87