4

I'm trying to write a PostgreSQL function for table upserts that can be used for any table. My starting point is taken from a concrete function for a specific table type:

CREATE TABLE doodad(id BIGINT PRIMARY KEY, data JSON);
CREATE OR REPLACE FUNCTION upsert_doodad(d doodad) RETURNS VOID AS
  $BODY$
BEGIN
  LOOP
    UPDATE doodad
       SET id = (d).id, data = (d).data
     WHERE id = (d).id;
    IF found THEN
      RETURN;
    END IF;

    -- does not exist, or was just deleted.

    BEGIN
      INSERT INTO doodad SELECT d.*;
      RETURN;
    EXCEPTION when UNIQUE_VIOLATION THEN
      -- do nothing, and loop to try the update again
    END;

  END LOOP;
END;
  $BODY$
LANGUAGE plpgsql;

The dynamic SQL version of this for any table that I've come up with is here: SQL Fiddle

CREATE OR REPLACE FUNCTION upsert(target ANYELEMENT) RETURNS VOID AS
$$
DECLARE
  attr_name NAME;
  col TEXT;
  selectors TEXT[];
  setters TEXT[];
  update_stmt TEXT;
  insert_stmt TEXT;
BEGIN
  FOR attr_name IN SELECT a.attname
                     FROM pg_index i
                     JOIN pg_attribute a ON a.attrelid = i.indrelid 
                                        AND a.attnum = ANY(i.indkey)
                    WHERE i.indrelid = format_type(pg_typeof(target), NULL)::regclass
                      AND i.indisprimary
  LOOP
    selectors := array_append(selectors, format('%1$s = target.%1$s', attr_name));
  END LOOP;

  FOR col IN SELECT json_object_keys(row_to_json(target))
  LOOP
    setters := array_append(setters, format('%1$s = (target).%1$s', col)); 
  END LOOP;

  update_stmt := format(
    'UPDATE %s SET %s WHERE %s',
    pg_typeof(target),
    array_to_string(setters, ', '),
    array_to_string(selectors, ' AND ')
  );
  insert_stmt := format('INSERT INTO %s SELECT (target).*', pg_typeof(target));

  LOOP
    EXECUTE update_stmt; 
    IF found THEN
      RETURN;
    END IF;

    BEGIN
      EXECUTE insert_stmt;
      RETURN;
    EXCEPTION when UNIQUE_VIOLATION THEN
      -- do nothing
    END;
  END LOOP;
END;
$$
LANGUAGE plpgsql;

When I attempt to use this function, I get an error:

SELECT * FROM upsert(ROW(1,'{}')::doodad);

ERROR: column "target" does not exist: SELECT * FROM upsert(ROW(1,'{}')::doodad)

I tried changing the upsert statement to use placeholders, but I can't figure out how to invoke it using the record:

EXECUTE update_stmt USING target;

ERROR: there is no parameter $2: SELECT * FROM upsert(ROW(1,'{}')::doodad)

EXECUTE update_stmt USING target.*;

ERROR: query "SELECT target.*" returned 2 columns: SELECT * FROM upsert(ROW(1,'{}')::doodad)

I feel really close to a solution, but I can't figure out the syntax issues.

Christopher Currie
  • 3,025
  • 1
  • 29
  • 40
  • Why upsert function instead of writable CTE upsert? – Jakub Kania Feb 13 '15 at 20:32
  • Because even the writable CTE is vulnerable to unique contraint violations. http://stackoverflow.com/questions/1109061/insert-on-duplicate-update-in-postgresql/8702291#8702291 and http://dba.stackexchange.com/questions/78510/why-is-cte-open-to-lost-updates for details. So I'd still want a function for the retry loop. I'm open to using the wCTE as the implementation, if it makes a significant performance impact. – Christopher Currie Feb 13 '15 at 21:55

1 Answers1

4

Short answer: you can't.

Variable substitution does not happen in the command string given to EXECUTE or one of its variants. If you need to insert a varying value into such a command, do so as part of constructing the string value, or use USING, as illustrated in Section 40.5.4. 1

Longer answer:

SQL statements and expressions within a PL/pgSQL function can refer to variables and parameters of the function. Behind the scenes, PL/pgSQL substitutes query parameters for such references. 2

This was the first important piece to the puzzle: PL/pgSQL does magic transformations on function parameters that turn them into variable substitutions.

The second was that fields of variable substitutions can referenced:

Parameters to a function can be composite types (complete table rows). In that case, the corresponding identifier $n will be a row variable, and fields can be selected from it, for example $1.user_id. 3

This excerpt confused me, because it referred to function parameters, but knowing that function parameters are implemented as variable substitutions under the hood, it seemed that I should be able to use the same syntax in EXECUTE.

These two facts unlocked the solution: use the ROW variable in the USING clause, and dereference its fields in the dynamic SQL. The results (SQL Fiddle):

CREATE OR REPLACE FUNCTION upsert(v_target ANYELEMENT)
  RETURNS SETOF ANYELEMENT AS
$$
DECLARE
  v_target_name TEXT;
  v_attr_name NAME;
  v_selectors TEXT[];
  v_colname TEXT;
  v_setters TEXT[];
  v_update_stmt TEXT;
  v_insert_stmt TEXT;
  v_temp RECORD;
BEGIN
  v_target_name := format_type(pg_typeof(v_target), NULL);

  FOR v_attr_name IN SELECT a.attname
                     FROM pg_index i
                     JOIN pg_attribute a ON a.attrelid = i.indrelid 
                                        AND a.attnum = ANY(i.indkey)
                    WHERE i.indrelid = v_target_name::regclass
                      AND i.indisprimary
  LOOP
    v_selectors := array_append(v_selectors, format('t.%1$I = $1.%1$I', v_attr_name));
  END LOOP;

  FOR v_colname IN SELECT json_object_keys(row_to_json(v_target))
  LOOP
    v_setters := array_append(v_setters, format('%1$I = $1.%1$I', v_colname));
  END LOOP;

  v_update_stmt := format(
      'UPDATE %I t SET %s WHERE %s RETURNING t.*',
      v_target_name,
      array_to_string(v_setters, ','),
      array_to_string(v_selectors, ' AND ')
  );

  v_insert_stmt = format('INSERT INTO %I SELECT $1.*', v_target_name);

  LOOP
    EXECUTE v_update_stmt INTO v_temp USING v_target;
    IF v_temp IS NOT NULL THEN
      EXIT;
    END IF;

    BEGIN
      EXECUTE v_insert_stmt USING v_target;
      EXIT;
    EXCEPTION when UNIQUE_VIOLATION THEN
      -- do nothing
    END;
  END LOOP;
  RETURN QUERY SELECT v_target.*;
END;
$$
LANGUAGE plpgsql;

For writeable CTE fans, this is trivially convertible to CTE form:

v_cte_stmt = format(
    'WITH up as (%s) %s WHERE NOT EXISTS (SELECT 1 from up t WHERE %s)',
    v_update_stmt,
    v_insert_stmt,
    array_to_string(v_selectors, ' AND '));

LOOP
  BEGIN
    EXECUTE v_cte_stmt USING v_target;
    EXIT;
  EXCEPTION when UNIQUE_VIOLATION THEN
    -- do nothing
  END;
END LOOP;
RETURN QUERY SELECT v_target.*;

NB: I have done zero performance testing on this solution, and I am relying on the analysis of others for its correctness. For now it appears to run correctly on PostgreSQL 9.3 in my development environment. YMMV.

Christopher Currie
  • 3,025
  • 1
  • 29
  • 40