I am having .option("mergeSchema", "true")
in my code still I am getting schema mismatch error. I am reading schema for parquet my timestamp was in bigint
format so I converted to timestamp format and then created new column date which I want to partition my data on.
df = df.withColumn("_processed_delta_timestamp", F.current_timestamp()) \
.withColumn("_input_file_name", F.input_file_name())\
.withColumn('date', F.date_format(F.date_trunc('Day', (F.col("timestamp") / 1000).cast(TimestampType())), 'yyyy-MM-dd')) \
.withColumn('date', to_date(F.col('date'), 'yyyy-MM-dd'))
df.writeStream.format('delta') \
.outputMode("append") \
.option("mergeSchema", "true") \
.option('checkpointLocation', checkpoint_path) \
.partitionBy('date')\
.option('path', output_path)\
.toTable(f"{output_database_name}.{output_table_name}")
Error that I am getting
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.
Table schema:
root
-- metric_stream_name: string (nullable = true)
-- account_id: string (nullable = true)
-- region: string (nullable = true)
-- namespace: string (nullable = true)
-- metric_name: string (nullable = true)
-- dimension: struct (nullable = true)
|-- ApiName: string (nullable = true)
-- timestamp: long (nullable = true)
-- value: struct (nullable = true)
|-- max: double (nullable = true)
|-- min: double (nullable = true)
|-- sum: double (nullable = true)
|-- count: double (nullable = true)
-- unit: string (nullable = true)
-- _processed_delta_timestamp: timestamp (nullable = true)
-- _input_file_name: string (nullable = true)
Data schema:
root
-- metric_stream_name: string (nullable = true)
-- account_id: string (nullable = true)
-- region: string (nullable = true)
-- namespace: string (nullable = true)
-- metric_name: string (nullable = true)
-- dimension: struct (nullable = true)
|-- ApiName: string (nullable = true)
-- timestamp: long (nullable = true)
-- value: struct (nullable = true)
|-- max: double (nullable = true)
|-- min: double (nullable = true)
|-- sum: double (nullable = true)
|-- count: double (nullable = true)
-- unit: string (nullable = true)
-- _processed_delta_timestamp: timestamp (nullable = true)
-- _input_file_name: string (nullable = true)
-- date: date (nullable = true)
Partition columns do not match the partition columns of the table.
Given: [`date`]
Table: [`timestamp`]