1

In a Databricks notebook solution (10.4 LTS), a file is read every day for four years. At some point during the four years, new columns are added. In the initial run, the dataframe schema is created without the new columns and reprocessing the files generates an error.

Is there a way to refresh that dataframe (not necessarily in the pipeline, but preferred) so that the dataframe automatically recognizes that new columns were added and posts them as nulls for the back-dated file columns?

The dataframe should continue reading the rows without an error when it encounters a new column.

Assuming the files are in the Databricks repository, read in each file using:

for row in dfdir.collect()
   df=spark.read.load(row.path,format="csv",sep="\t",encoding="utf-16",inferschema="false",header="true)

The error looks like:

Column 'e5' does not exist.  Did you mean one of the following: [e1, e2, e3, e4, dt]

+---+---+----+----+------+

| e1| e2|  e3|  e4|  dt  |
+---+---+----+----+------+
|  a|  b|   c|null|1/1/20|
|  s|  d|   a|   d|1/2/20|
|  f|  s|null|null|1/2/20|
+---+---+----+----+------+

Later on 1/4/20, a new column (e5) is added

+---+---+----+----+------+-------+
| e1| e2|  e3|  e4|  dt  |     e5|
+---+---+----+----+------+-------+
|  a|  b|   c|null|1/1/20|       |
|  s|  d|   a|   d|1/2/20|       |
|  f|  s|null|null|1/3/20|       |
|  f|  s|null|null|1/4/20|newdata|
+---+---+----+----+------+-------+

Desired dataframe result

+---+---+----+----+------+-------+
| e1| e2|  e3|  e4|  dt  |     e5|
+---+---+----+----+------+-------+
|  a|  b|   c|null|1/1/20|  null |
|  s|  d|   a|   d|1/2/20|  null |
|  f|  s|null|null|1/3/20|  null |
|  f|  s|null|null|1/4/20|newdata|
+---+---+----+----+------+-------+

This is bit of over-simplification of the problem though as the table has roughly 4 million records but I'm hoping it shows enough of the issue to allow it to be reproduced. Each record contains about 4000 rows and there are about 1000 days worth of records. The earlier records have over 30 columns and the later have 35 columns.

jamiel22
  • 117
  • 6

2 Answers2

1

Using how to add a null column over in Add an empty column to Spark DataFrame

if "e5" not in df.columns:
  df = df.withColumn('e5', lit(None).cast(StringType()))
IMarvinTPA
  • 54
  • 7
0

You are saving parquet format or delta format?

If is the last one you can use schema evolution feature provided by delta lake

your_df.write.format("delta") \
           .option("mergeSchema", "true") \
           .mode("append") \
           .save(DELTALAKE_SILVER_PATH)
M. Alexandru
  • 614
  • 5
  • 20