25

I am using Apache Spark DataFrames to join two data sources and get the result as another DataFrame. I want to write the result to another Postgres table. I see this option :

myDataFrame.write.jdbc(url, table, connectionProperties)

But, what I want to do is UPSERT the dataframe into table based on the Primary Key of the Table. How is this to be done? I am using Spark 1.6.0.

zero323
  • 322,348
  • 103
  • 959
  • 935
void
  • 2,403
  • 6
  • 28
  • 53

4 Answers4

24

It is not supported. DataFrameWriter can either append to or overwrite existing table. If your application requires more complex logic you'll have to deal with this manually.

One option is to use an action (foreach, foreachPartition) with standard JDBC connection. Another one is to write to a temporary and handle the rest directly in the database.

See also SPARK-19335 (Spark should support doing an efficient DataFrame Upsert via JDBC) and related proposals.

10465355
  • 4,481
  • 2
  • 20
  • 44
zero323
  • 322,348
  • 103
  • 959
  • 935
  • Also, how to overwrite to existing jcbc table? I can only see the option of df.write.mode().saveAsTable() But this doesn't seem to support jdbc tables – void Jan 07 '16 at 06:59
  • 1
    dataframe.write.mode(SaveMode.OverWrite) – Aydin K. Feb 13 '18 at 12:43
  • @zero323 how to `append` a dataframe to existing postgres table? – saadi Jul 17 '19 at 11:06
  • @saadi df4.write.mode('append').format("jdbc") \ .option("url", "jdbc:postgresql://your_host.com/your_db") \ .option("dbtable", "public.table_name") \ .option("user", "your_user") \ .option("password", "your_password") \ .save() – alvaro nortes Dec 31 '19 at 10:49
13

KrisP has the right of it. The best way to do an upsert is not through a prepared statement. It's important to note that this method will insert one at a time with as many partitions as the number of workers you have. If you want to do this in batch you can as well

import java.sql._
dataframe.coalesce("NUMBER OF WORKERS").mapPartitions((d) => Iterator(d)).foreach { batch =>
  val dbc: Connection = DriverManager.getConnection("JDBCURL")
  val st: PreparedStatement = dbc.prepareStatement("YOUR PREPARED STATEMENT")

  batch.grouped("# Of Rows you want per batch").foreach { session =>
    session.foreach { x =>
      st.setDouble(1, x.getDouble(1)) 
      st.addBatch()
    }
    st.executeBatch()
  }
  dbc.close()
}

This will execute batches for each worker and close the DB connection. It gives you control over how many workers, how many batches and allows you to work within those confines.

shridharama
  • 949
  • 11
  • 18
jstuartmill
  • 288
  • 5
  • 11
  • Can "Update" sql statement be used here to update the single row in database table? – ajay_t Aug 16 '18 at 19:20
  • Yep, the prepared statement can be an insert. – jstuartmill Aug 18 '18 at 04:14
  • 2
    What's the purpose of the `mapPartitions(d => Iterator(d))`? Is that the same as `foreachPartition`? – shridharama May 21 '19 at 23:03
  • 2
    @jstuartmill i tried your approach . I got error for `Iterator ` this is the error `Unable to find encoder for type Iterator[org.apache.spark.sql.Row]. An implicit Encoder[Iterator[org.apache.spark.sql.Row]] is needed to store Iterator[org.apache.spark.sql.Row] instances in a Dataset. Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._ Support for serializing other types will be added in future releases.` spark version is 2.4.4 – PriyalChaudhari Nov 01 '19 at 04:50
10

If you are going to do it manually and via option 1 mentioned by zero323, you should take a look at Spark source code for the insert statement here

  def insertStatement(conn: Connection, table: String, rddSchema: StructType): PreparedStatement = {
    val columns = rddSchema.fields.map(_.name).mkString(",")
    val placeholders = rddSchema.fields.map(_ => "?").mkString(",")
    val sql = s"INSERT INTO $table ($columns) VALUES ($placeholders)"
    conn.prepareStatement(sql)
  }

The PreparedStatement is part of java.sql and it has methods like execute() and executeUpdate(). You still have to modify the sql accordingly, of course.

KrisP
  • 1,206
  • 8
  • 10
3

To insert JDBC you can use

dataframe.write.mode(SaveMode.Append).jdbc(jdbc_url,table_name,connection_properties)

Also,Dataframe.write gives you a DataFrameWriter and it has some methods to insert the dataframe.

def insertInto(tableName: String): Unit

Inserts the content of the DataFrame to the specified table. It requires that the schema of the DataFrame is the same as the schema of the table.

Because it inserts data to an existing table, format or options will be ignored.

http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrameWriter

Nothing yet to update individual records out of the box from spark though

Soumitra
  • 604
  • 1
  • 8
  • 20