2

I am trying to insert spark DF to Postgres using JDBC write. The postgres table has a unique constraint on one of the columns, when the df to be inserted violates the constraint entire batch is rejected and spark session closes giving an error duplicate key value violates unique constraint which is correct as the data is duplicate (already exists in the database) org.postgresql.jdbc.BatchResultHandler.handleError(BatchResultHandler.java:148

What is needed that the data rows which do not violate the constraint be inserted and the failed row be ignored, without failing the entire batch.

The code used is:

mode = "Append"
url = "jdbc:postgresql://IP/DB name"
properties = {"user": "username", "password": "password"} 
DF.write
.option("numPartitions",partitions_for_parallelism)
.option("batchsize",batch_size)
.jdbc(url=url, table="table name", mode=mode, properties=properties)

How can I do this?

Mark Rotteveel
  • 100,966
  • 191
  • 140
  • 197
KVS
  • 51
  • 2
  • 4
  • Without implementing a custom write to the database as part of forEachPartition it is not possible to skip failed batches. If you can change the table constraint, better to drop the constraint then run deduplication logic as a part of SQL query. – David Greenshtein Jul 31 '18 at 12:59
  • @DavidGreenshtein Can you clarify where you would run de-duplication? Thx. I have my own opinion, but interested in yours. – thebluephantom Jul 31 '18 at 14:43
  • @David Greenshtein:Thanks for the suggestion. On using forEachPartition I get an error : row type not iterable . While I can find few examples using scala but there seems to be no pyspark equivalent code. – KVS Aug 01 '18 at 04:43

2 Answers2

1

Unfortunately, there is no out of the box solution by Spark. There is a number of possible solutions I see:

  1. Implement business logic of conflict resolution in PostgreSQL database as part of the forEachPartition function. For example, catch the exception of the constraint violation then report to the log.

  2. Drop the constraint on PostgreSQL database, use autogenerated PK means enable to store duplicated rows in the database. Deduplication logic may be further implemented as a part of each SQL query or running deduplication on a daily/hourly basis. You can see example here.

  3. In case there is no other system or process writing to PostgreSQL table except your Spark job it is possible to do filter using the join operation to remove all existing rows from Spark Dataframe before spark.write something like this

I hope my ideas would be helpful.

  • So what does the forEachPartiton logic do? Cannot gauge from your 2nd point. @David Greenshtein – thebluephantom Aug 01 '18 at 09:25
  • in case the constraint field defined in PostgreSQL has not a huge cardinality the idea is to repartition the data according to the constraint defined in PostgreSQL before forEachPartition -> prepare a bulk which will contain the rows having the same constraint value -> write bulk to the database -> if failed log and continue to the next bulk – David Greenshtein Aug 01 '18 at 09:59
  • I guess I would need to see the logic. I think I get it but not the approach I think I would have come up with. Interesting. – thebluephantom Aug 01 '18 at 10:05
0

That is not possible if you have a unique constraint on the target. There is no UPSert mode currently with these techniques. You need to design around this aspect.

thebluephantom
  • 16,458
  • 8
  • 40
  • 83
  • Thank you for your help but basically I am not looking for a solution like Upsert, No record has to updated in case the record is duplicate . Looking for something like what SSIS does marks the failed row as error and inserts all other in the batch – KVS Aug 01 '18 at 04:35
  • I get that but it will not fly unless you do it like the other person stated which is totally different than your original approach. So I see younthink you should check for existency prior to write. Interesting, keen to note your eventual solution – thebluephantom Aug 01 '18 at 05:29
  • What if a very large target? – thebluephantom Aug 01 '18 at 05:46
  • I would not be checking if the data row exists or not since this will be bottleneck in performance .Still looking for a balanced soluion though ..Hope to find one soon – KVS Aug 01 '18 at 06:06
  • My sentiments exactly. So I am not sure what David is proposing. My bet is no unique constraint and deduplicate at target periodically. Pls let me know how u resolve. – thebluephantom Aug 01 '18 at 06:55