4

I wanted to read multiple CSV files with different number of columns using PySpark.

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']

f1 file has 50 columns, f2 has 10 more columns that constitutes total 60 columns and f3 has 30 more columns that is total 80 columns for f3 file and so on.

However,

df = spark.read.csv(Files,header=True)

gives only 50 columns. I am expecting 80 columns. Since f1 file has only 50 columns, so remaining 30 columns will be filled NAN values for the f1 file data. Same is true for other CSV files. Pandas dataframe gives me the all 80 columns perfectly:

import pandas as pd
import glob
df = pd.concat(map(pd.read_csv, ['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']))

But I can't do the same thing with PySpark. How can I read all columns of the above 5 CSV files into single spark dataframe?

double-beep
  • 5,031
  • 17
  • 33
  • 41
ForestGump
  • 50
  • 2
  • 19
  • Are these files related in some way? Do they have the same number of rows and the rows are all in order? – tdelaney Feb 01 '23 at 03:24
  • Each day in the Backblaze data center, we take a snapshot of each operational hard drive. This snapshot includes basic drive information along with the S.M.A.R.T. statistics reported by that drive. The daily snapshot of one drive is one record or row of data. All of the drive snapshots for a given day are collected into a file consisting of a row for each active hard drive. – ForestGump Feb 01 '23 at 03:38
  • https://www.backblaze.com/b2/hard-drive-test-data.html is the data source – ForestGump Feb 01 '23 at 03:38

2 Answers2

2

You can read each file into its own Spark dataframe, to combine all dataframes into one dataframe, use union.

Fill the the missing columns in the dataframes with fewer columns.

Merge them using union or reduce.

from functools import reduce
from pyspark.sql.functions import lit, col

df_list = [spark.read.csv("f{}.csv".format(i), header=True) for i in range(1, 6)]

cols = [len(df.columns) for df in df_list]
max_cols = max(cols)

df_list = [df.select(*[col(c) for c in df.columns] + [lit(None).alias("col_{}".format(i+j)) for i in range(len(df.columns), max_cols)]) for j, df in enumerate(df_list)]

df_final = reduce(lambda x, y: x.union(y), df_list)

I reproduced your case on this github.

Daniel Sobrado
  • 717
  • 1
  • 9
  • 22
  • How this line spark.read.csv("f{}.csv".format(i), header=True) for i in range(1, 6) works? – ForestGump Feb 01 '23 at 04:07
  • The file names are generated by "f{}.csv".format(i) where {} is a placeholder for the value of i which is supplied by the for loop. So for each iteration of the loop, i is incremented by 1 and the string "f{}.csv".format(i) is replaced by the actual value of i. Please accept the answer if that solves your problem. – Daniel Sobrado Feb 01 '23 at 04:48
  • How it will access the csv files from the list, say Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv'] ? – ForestGump Feb 01 '23 at 04:51
  • I tried this, df_list=[spark.read.csv(List5Files,header=True)] but it didn't work – ForestGump Feb 01 '23 at 05:01
  • Instead of f{}.csv, use Data/f{}.csv – Daniel Sobrado Feb 01 '23 at 05:08
  • df_list = [spark.read.csv("Data/f{}.csv".format(i), header=True) for i in range(1, 6)] – Daniel Sobrado Feb 01 '23 at 05:25
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/251538/discussion-between-daniel-sobrado-and-forestgump). – Daniel Sobrado Feb 01 '23 at 05:37
1

It was a very easy fix. What I did,

Files=['Data/f1.csv','Data/f2.csv','Data/f3.csv','Data/f4.csv','Data/f5.csv']
Files.reverse()
df = spark.read.csv(Files,inferSchema=True, header=True)

Last files had all columns because columns were added incrementally. Reversing them solved the issues.

ForestGump
  • 50
  • 2
  • 19