3

I have a PostgreSQL database set up with a table and columns already defined. The primary key for the table is a combination of (Id, datetime) column. I need to periodically INSERT data for different Ids from R data.table into the database. However, if data for a particular (Id, datetime) combination already exists it should be UPDATED (overwritten). How can I do this using RPostgres or RPostgreSQL packages?

When I try to insert a data.table where some (Id, datetime) rows already exist I get an error saying the primary key constraint is violated:

dbWriteTable(con, table, dt, append = TRUE, row.names = FALSE)

Error in connection_copy_data(conn@ptr, sql, value) : 
  COPY returned error: ERROR:  duplicate key value violates unique constraint "interval_data_pkey"
DETAIL:  Key (id, dttm_utc)=(a0za000000CSdLoAAL, 2018-10-01 05:15:00+00) already exists.
CONTEXT:  COPY interval_data, line 1
ishan
  • 150
  • 13
  • 1
    Question is too broad. Please read up on tutorials/vignettes/books and make an earnest attempt. Then, come back with specific issues on your implementation. – Parfait Aug 12 '19 at 14:50
  • 2
    Insert into a new (or reused but truncated) table and merge insert into the target table. Sorry but this is a pure SQL problem, not an R problem... – R Yoda Aug 12 '19 at 16:05
  • @RYoda Do you mean create a new temporary table and then merge that with the target table? Can you point me to a resource on how to do the "merge insert" operation you mention? – ishan Aug 12 '19 at 16:07
  • See the `merge` stmt doc for PostgreSQL: https://www.postgresql.org/message-id/attachment/23520/sql-merge.html – R Yoda Aug 12 '19 at 16:30
  • I get a syntax error when I try a MERGE statement and an answer here seems to indicate there is no MERGE statement in PostgreSQL? https://stackoverflow.com/questions/49368083/make-merge-on-postgresql-9-5 – ishan Aug 12 '19 at 21:51

1 Answers1

1

You can use my pg package that has upsert functionality, or just grab code for upsert from there: https://github.com/jangorecki/pg/blob/master/R/pg.R#L249 It is basically what others said in comments. Write data into temp table and then insert into destination table using on conflict clause.

pgSendUpsert = function(stage_name, name, conflict_by, on_conflict = "DO NOTHING", techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    cols = pgListFields(stage_name)
    cols = setdiff(cols, c("run_id","r_timestamp")) # remove techstamp to have clean column list, as the fresh one will be used, if any
    # sql
    insert_into = sprintf("INSERT INTO %s.%s (%s)", name[1L], name[2L], paste(if(techstamp) c(cols, c("run_id","r_timestamp")) else cols, collapse=", "))
    select = sprintf("SELECT %s", paste(cols, collapse=", "))
    if(techstamp) select = sprintf("%s, %s::INTEGER run_id, '%s'::TIMESTAMPTZ r_timestamp", select, get_run_id(), format(Sys.time(), "%Y-%m-%d %H:%M:%OS"))
    from = sprintf("FROM %s.%s", stage_name[1L], stage_name[2L])
    if(!missing(conflict_by)) on_conflict = paste(paste0("(",paste(conflict_by, collapse=", "),")"), on_conflict)
    on_conflict = paste("ON CONFLICT",on_conflict)
    sql = paste0(paste(insert_into, select, from, on_conflict), ";")
    pgSendQuery(sql, conn = conn, .log = .log)
}

#' @rdname pg
pgUpsertTable = function(name, value, conflict_by, on_conflict = "DO NOTHING", stage_name, techstamp = TRUE, conn = getOption("pg.conn"), .log = getOption("pg.log",TRUE)){
    stopifnot(!is.null(conn), is.logical(.log), is.logical(techstamp), is.character(on_conflict), length(on_conflict)==1L)
    name = schema_table(name)
    if(!missing(stage_name)){
        stage_name = schema_table(stage_name)
        drop_stage = FALSE
    } else {
        stage_name = name
        stage_name[2L] = paste("tmp", stage_name[2L], sep="_")
        drop_stage = TRUE
    }
    if(pgExistsTable(stage_name)) pgTruncateTable(name = stage_name, conn = conn, .log = .log)
    pgWriteTable(name = stage_name, value = value, techstamp = techstamp, conn = conn, .log = .log)
    on.exit(if(drop_stage) pgDropTable(stage_name, conn = conn, .log = .log))
    pgSendUpsert(stage_name = stage_name, name = name, conflict_by = conflict_by, on_conflict = on_conflict, techstamp = techstamp, conn = conn, .log = .log)
}
jangorecki
  • 16,384
  • 4
  • 79
  • 160