I have historical dataframe and it is partitioned by deptno
df1:
+-----+------+---------+----+----------+-------+-------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00| 30|
| 7521| WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00| 30|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00| 30|
| 7698| BLAKE| MANAGER|7839|1981-05-01|2850.00| null| 30|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00| 0.00| 30|
| 7900| JAMES| CLERK|7698|1981-12-03| 950.00| null| 30|
| 7369| SMITH| CLERK|7902|1980-12-17| 800.00| null| 20|
| 7566| JONES| MANAGER|7839|1981-04-02|2975.00| null| 20|
| 7788| SCOTT| ANALYST|7566|1982-12-09|3000.00| null| 20|
| 7876| ADAMS| CLERK|7788|1983-01-12|1100.00| null| 20|
| 7782| CLARK| MANAGER|7839|1981-06-09|2450.00| null| 10|
| 7839| KING|PRESIDENT|7782|1981-11-17|5000.00| null| 10|
+-----+------+---------+----+----------+-------+-------+------+
I get updates into other dataframe
df2:
+-----+-----+---------+----+----------+-------+----+------+
|empno|ename| job| mgr| hiredate| sal|comm|deptno|
+-----+-----+---------+----+----------+-------+----+------+
| 7839|KING | Leader|7782|1981-11-17|15000.00|null| 10|
+-----+-----+---------+----+----------+-------+----+------+
Now I want to update old record(empno=7839) with new record(updated record) by overwriting only respective partition(dept=10).
+-----+------+---------+----+----------+-------+-------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00| 30|
| 7566| JONES| MANAGER|7839|1981-04-02|2975.00| null| 20|
| 7900| JAMES| CLERK|7698|1981-12-03| 950.00| null| 30|
| 7839| KING |LEADER |7782|1981-11-17|15000.00| null| 10|
| 7369| SMITH| CLERK|7902|1980-12-17| 800.00| null| 20|
| 7782| CLARK| MANAGER|7839|1981-06-09|2450.00| null| 10|
| 7876| ADAMS| CLERK|7788|1983-01-12|1100.00| null| 20|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00| 0.00| 30|
| 7788| SCOTT| ANALYST|7566|1982-12-09|3000.00| null| 20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00| 30|
| 7698| BLAKE| MANAGER|7839|1981-05-01|2850.00| null| 30|
| 7521| WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00| 30|
+-----+------+---------+----+----------+-------+-------+------+
I have solution below but it overwrite all partitions instead overwriting modified partitions.
val df1 = spark.read.orc("data/emp/")
val finalQry = "SELECT * FROM emp"
val df2 = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test")
.option("user", "root")
.option("dbtable", "(" + finalQry + ") as t")
.load()
val projections = df1.schema.fields.map { field =>
coalesce(df2.col(field.name), df1.col(field.name)).as(field.name)
}
val finalDf = df1.join(df2, df1.col("empno") === df2.col("empno"), "fullouter").select(projections: _*)
finalDf.write.format("orc").mode("overwrite").partitionBy("deptno").save("data/emp/")
Considering huge size of historical data, need solution: finding right partitions with updated records and overwrite them.