1

In this question, I had asked how to combine PySpark data frames with a different number of columns. The answer given required that each data frame had to have the same number of columns to combine them all:

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit

spark = SparkSession.builder\
    .appName("DynamicFrame")\
    .getOrCreate()

df01 = spark.createDataFrame([(1, 2, 3), (9, 5, 6)], ("C1", "C2", "C3"))
df02 = spark.createDataFrame([(11,12, 13), (10, 15, 16)], ("C2", "C3", "C4"))
df03 = spark.createDataFrame([(111,112), (110, 115)], ("C1", "C4"))

dataframes = [df01, df02, df03]

# Create a list of all the column names and sort them
cols = set()
for df in dataframes:
    for x in df.columns:
        cols.add(x)
cols = sorted(cols)

# Create a dictionary with all the dataframes
dfs = {}
for i, d in enumerate(dataframes):
    new_name = 'df' + str(i)  # New name for the key, the dataframe is the value
    dfs[new_name] = d
    # Loop through all column names. Add the missing columns to the dataframe (with value 0)
    for x in cols:
        if x not in d.columns:
            dfs[new_name] = dfs[new_name].withColumn(x, lit(0))
    dfs[new_name] = dfs[new_name].select(cols)  # Use 'select' to get the columns sorted

# Now put it al together with a loop (union)
result = dfs['df0']      # Take the first dataframe, add the others to it
dfs_to_add = dfs.keys()  # List of all the dataframes in the dictionary
dfs_to_add.remove('df0') # Remove the first one, because it is already in the result
for x in dfs_to_add:
    result = result.union(dfs[x])
result.show()

Is there any way to combine PySpark data frames without having to ensure that all the data frames have the same number of columns? The reason I ask is that it took about 2 days for 100 data frames to be merged but the process timed out using the above code.

1 Answers1

1

df = df1.unionByName(df2, allowMissingColumns=True)

puligun
  • 332
  • 3
  • 12
  • Is that only for Spark 3.1? – timeinception23 Jun 28 '21 at 19:30
  • yes if you are using an older version for union to work you should make sure all the dataframes you are combining all have the same column names. Use withColumn and add nulls where the columns dont exist – puligun Jun 28 '21 at 19:36
  • So would have to do something similar to the answer in this question: https://stackoverflow.com/questions/53165816/pyspark-dynamic-union-of-dataframes-with-different-columns – timeinception23 Jun 28 '21 at 19:37
  • Is there an easier way of doing it? Can you add it to your answer for older versions of Spark? – timeinception23 Jun 28 '21 at 19:38