I am trying to merge 5 dataframes from csv's and creating new dataframe than in the second step I am creating empty table with custom schema, now I am want to load the records from the dataframe.
Here is the step by step details.
- creating datatframes from all 5 datafromes
cr_df = spark.read.format("csv").option("header", "true").load("abfss://abcxxxxxxxxxxxx.dfs.core.windows.net/Position1.csv")
ir_df = spark.read.format("csv").option("header", "true").load("abfss://abc2xxxxxxxxxxxx.dfs.core.windows.net/Position2.csv")
fx_df = spark.read.format("csv").option("header", "true").load("abfss://abc3xxxxxxxxxxxx.dfs.core.windows.net/Position3.csv")
eq_df = spark.read.format("csv").option("header", "true").load("abfss://abc4xxxxxxxxxxxx.dfs.core.windows.net/Position4.csv")
co_df = spark.read.format("csv").option("header", "true").load("abfss://derdi@abc5xxxxxxxxxxxx.dfs.core.windows.net/Position5.csv")
- Merge above dataframes :
merged_df = cr_df.unionByName(ir_df, allowMissingColumns=True) \
.unionByName(fx_df, allowMissingColumns=True) \
.unionByName(eq_df, allowMissingColumns=True) \
.unionByName(co_df, allowMissingColumns=True)
- Create empty table with custom schema:
CREATE TABLE staging.ddr_position_test
(
ReportDate DATE ,
JurisdictionId INTEGER ,
TransactionId VARCHAR(256) ,
ReportAssetClass VARCHAR(30) ,
ReportTradeSequence DECIMAL(4) ,
LoadId INTEGER,
DataSourceId VARCHAR(4),
Cleared VARCHAR(50)
) USING DELTA
PARTITIONED BY (ReportDate, ReportAssetClass)
LOCATION 'abfss://slv-container@xyzxxxxxxxxxxxxx.dfs.core.windows.net//silver//delta/ddr_position_test/'
Step 4: I was getting shcema mismatch error hence I created column datatypes for this merge dataframe, please note I am cant use schema overrite or merge option because i have requirement with certain type of column datatypes and also few columns needs to rename.
df = decimal_to_string_Cols_df.withColumn("ReportDate", lit("ReportDate").cast(DateType())) \
.withColumn("JurisdictionId", lit("JurisdictionId").cast(IntegerType())) \
.withColumn("ReportAssetClass", lit("ReportAssetClass").cast(StringType())) \
.withColumn("ReportTradeSequence", lit("ReportTradeSequence").cast(DecimalType(4))) \
.withColumn("LoadId", lit("LoadId").cast(IntegerType())) \
.withColumn("CreatedTimestamp", lit("CreatedTimestamp").cast(TimestampType()))
and as final step (step-5) I am writing and saving this like
df.write.mode('append').format('delta') \
.option("path", "abfss://container@abcdxxxxxxxxxx.dfs.core.windows.net/delta/ddr_position_test/") \
.saveAsTable("staging.ddr_position_test")
After doing some research i believe this error is might be due to null or not null columns (just a wild guess), any help or suggestions would be appreciated.