2

In the below I'm code trying to merge a dataframe to a delta table. Here I'm joining the new dataframe with the delta table and then transforming the joined data to match the delta table schema, and then merging that into the delta table.

But I'm getting AnalysisException.

Exception in thread "main" org.apache.spark.sql.AnalysisException: Resolved attribute(s) id#514 missing from _file_name_#872,age#516,id#879,name#636,age#881,name#880,city#882,id#631,_row_id_#866L,city#641 in operator !Join Inner, (id#514 = id#631). Attribute(s) with the same name appear in the operation: id. Please check if the right attribute(s) are used.;;
!Join Inner, (id#514 = id#631)
:- SubqueryAlias deltaData
:  +- Project [id#631, name#636, age#516, city#641]
:     +- Project [age#516, id#631, name#636, new_city#510 AS city#641]
:        +- Project [age#516, id#631, new_name#509 AS name#636, new_city#510]
:           +- Project [age#516, new_id#508 AS id#631, new_name#509, new_city#510]
:              +- Project [age#516, new_id#508, new_name#509, new_city#510]
:                 +- Join Inner, (id#514 = new_id#508)
:                    :- Relation[id#514,name#515,age#516,city#517] parquet
:                    +- LocalRelation [new_id#508, new_name#509, new_city#510]
+- Project [id#879, name#880, age#881, city#882, _row_id_#866L, input_file_name() AS _file_name_#872]
   +- Project [id#879, name#880, age#881, city#882, monotonically_increasing_id() AS _row_id_#866L]
      +- Project [id#854 AS id#879, name#855 AS name#880, age#856 AS age#881, city#857 AS city#882]
         +- Relation[id#854,name#855,age#856,city#857] parquet

My setup is Spark 3.0.0, Delta Lake 0.7.0, Hadoop 2.7.4

But the below code is running fine in Databricks 7.4 runtime, and the new dataframe is getting merged with the delta table

Code Snippet:

import io.delta.tables.DeltaTable
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.{SaveMode, SparkSession}

object CodePen extends App {
  val spark = SparkSession.builder().master("local[*]").getOrCreate()

  val deltaPath = "<delta-path>"
  val oldEmployee = Seq(
    Employee(10, "Django", 22, "Bangalore"),
    Employee(11, "Stephen", 30, "Bangalore"),
    Employee(12, "Calvin", 25, "Hyderabad"))

  val newEmployee = Seq(EmployeeNew(10, "Django", "Bangkok"))
  spark.createDataFrame(oldEmployee).write.format("delta").mode(SaveMode.Overwrite).save(deltaPath) // Saving the data to a delta table
  val newDf = spark.createDataFrame(newEmployee)

  val deltaTable = DeltaTable.forPath(deltaPath)
  val joinedDf = deltaTable.toDF.join(newDf, col("id") === col("new_id"), "inner")

  joinedDf.show()
  val cols = newDf.columns
  // Transforming the joined Dataframe to match the schema of the delta table
  var intDf = joinedDf.drop(cols.map(removePrefix): _*)
  for (column <- newDf.columns)
    intDf = intDf.withColumnRenamed(column, removePrefix(column))

  intDf = intDf.select(deltaTable.toDF.columns.map(col): _*)

  deltaTable.toDF.show()
  intDf.show()

  deltaTable.as("oldData")
    .merge(
      intDf.as("deltaData"),
      col("oldData.id") === col("deltaData.id"))
    .whenMatched()
    .updateAll()
    .execute()

  deltaTable.toDF.show()

  def removePrefix(column: String) = {
    column.replace("new_", "")
  }
}

case class Employee(id: Int, name: String, age: Int, city: String)

case class EmployeeNew(new_id: Int, new_name: String, new_city: String)

Below is the output of the dataframes.

New Dataframe:

+---+------+-------+
| id|  name|   city|
+---+------+-------+
| 10|Django|Bangkok|
+---+------+-------+

Joined Datafame:

+---+------+---+---------+------+--------+--------+
| id|  name|age|     city|new_id|new_name|new_city|
+---+------+---+---------+------+--------+--------+
| 10|Django| 22|Bangalore|    10|  Django| Bangkok|
+---+------+---+---------+------+--------+--------+

Delta Table Data:

+---+-------+---+---------+
| id|   name|age|     city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22|Bangalore|
+---+-------+---+---------+

Transformed New Dataframe:

+---+------+---+-------+
| id|  name|age|   city|
+---+------+---+-------+
| 10|Django| 22|Bangkok|
+---+------+---+-------+
Michael Heil
  • 16,250
  • 3
  • 42
  • 77
Ashis Jena
  • 450
  • 1
  • 8
  • 17
  • While writing the "transformed new dataframe" to parquet and reading the same files to a datafame and merging it with delta lake is working fine. – Ashis Jena Feb 05 '21 at 09:21

1 Answers1

2

You are getting this AnalysisException because the schemas of deltaTable and intDf are slightly different:

deltaTable.toDF.printSchema()
root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



intDf.printSchema()
root
 |-- id: integer (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)

Due to the fact, that intDf table resulted out of a join where the column "id" is used as a key, it will force your join condition column to be non-nullable.

If you change the nullale property as explained here you will get the desired output:

+---+-------+---+---------+
| id|   name|age|     city|
+---+-------+---+---------+
| 11|Stephen| 30|Bangalore|
| 12| Calvin| 25|Hyderabad|
| 10| Django| 22|  Bangkok|
+---+-------+---+---------+

Tested with Spark 3.0.1 and Delta 0.7.0.

Michael Heil
  • 16,250
  • 3
  • 42
  • 77