0

I have historical dataframe and it is partitioned by deptno

df1:

+-----+------+---------+----+----------+-------+-------+------+
|empno| ename|      job| mgr|  hiredate|    sal|   comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00|    30|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.00|   null|    30|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00|   0.00|    30|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950.00|   null|    30|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.00|   null|    20|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975.00|   null|    20|
| 7788| SCOTT|  ANALYST|7566|1982-12-09|3000.00|   null|    20|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100.00|   null|    20|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450.00|   null|    10|
| 7839|  KING|PRESIDENT|7782|1981-11-17|5000.00|   null|    10|
+-----+------+---------+----+----------+-------+-------+------+

enter image description here

I get updates into other dataframe

df2:

+-----+-----+---------+----+----------+-------+----+------+
|empno|ename|      job| mgr|  hiredate|    sal|comm|deptno|
+-----+-----+---------+----+----------+-------+----+------+
| 7839|KING |   Leader|7782|1981-11-17|15000.00|null|    10|
+-----+-----+---------+----+----------+-------+----+------+

Now I want to update old record(empno=7839) with new record(updated record) by overwriting only respective partition(dept=10).

+-----+------+---------+----+----------+-------+-------+------+
|empno| ename|      job| mgr|  hiredate|    sal|   comm|deptno|
+-----+------+---------+----+----------+-------+-------+------+
| 7499| ALLEN| SALESMAN|7698|1981-02-20|1600.00| 300.00|    30|
| 7566| JONES|  MANAGER|7839|1981-04-02|2975.00|   null|    20|
| 7900| JAMES|    CLERK|7698|1981-12-03| 950.00|   null|    30|
| 7839| KING |LEADER   |7782|1981-11-17|15000.00|   null|    10|
| 7369| SMITH|    CLERK|7902|1980-12-17| 800.00|   null|    20|
| 7782| CLARK|  MANAGER|7839|1981-06-09|2450.00|   null|    10|
| 7876| ADAMS|    CLERK|7788|1983-01-12|1100.00|   null|    20|
| 7844|TURNER| SALESMAN|7698|1981-09-08|1500.00|   0.00|    30|
| 7788| SCOTT|  ANALYST|7566|1982-12-09|3000.00|   null|    20|
| 7654|MARTIN| SALESMAN|7698|1981-09-28|1250.00|1400.00|    30|
| 7698| BLAKE|  MANAGER|7839|1981-05-01|2850.00|   null|    30|
| 7521|  WARD| SALESMAN|7698|1981-02-22|1250.00| 500.00|    30|
+-----+------+---------+----+----------+-------+-------+------+

I have solution below but it overwrite all partitions instead overwriting modified partitions.

  val df1 = spark.read.orc("data/emp/")
  val finalQry = "SELECT * FROM emp"
  val df2 = spark.read.format("jdbc").option("url", "jdbc:mysql://localhost:3306/test")
    .option("user", "root")
    .option("dbtable", "(" + finalQry + ") as t")
    .load()

  val projections = df1.schema.fields.map { field =>
    coalesce(df2.col(field.name), df1.col(field.name)).as(field.name)
  }

 val finalDf = df1.join(df2, df1.col("empno") === df2.col("empno"), "fullouter").select(projections: _*) 
 finalDf.write.format("orc").mode("overwrite").partitionBy("deptno").save("data/emp/")

Considering huge size of historical data, need solution: finding right partitions with updated records and overwrite them.

Aravind Kumar Anugula
  • 1,304
  • 3
  • 14
  • 35
  • Does this answer your question? [Overwrite specific partitions in spark dataframe write method](https://stackoverflow.com/questions/38487667/overwrite-specific-partitions-in-spark-dataframe-write-method) – Vincent Doba Sep 07 '21 at 16:37
  • No, referred question is about overwriting partition, my question is about finding right partitions to overwrite. – Aravind Kumar Anugula Sep 07 '21 at 17:04

2 Answers2

1

Set below property:

spark.conf.set("spark.sql.sources.partitionOverwriteMode", "dynamic")

If this property is not set, then all the existing partitions will be deleted and only new partitions will be added. For ex: If existing table has p1,p2 partitions and now adding new data with partitions p2,p3 then the resulting location will have p2 and p3 only. p1 will be removed.In case this property is set, then the resulting paritions will be p1,p2,p3.

0

You know which partitions that need overwrite, if you generate the dataframe with only data about each partition "deptno", can you overwrite each partition by specifying the partition in the path?

partition10Df.write.format("orc").mode("overwrite").save("data/emp/deptno=10")