1

I am looking at ideas for building an ETL pipeline to populate PostgreSQL tables. I expect to be loading tens of thousands of rows in a batch, so not huge amounts of data, but big enough to think through before implementing anything.

I expect that in the vast majority of cases, the incoming data will be clean and should insert with no issue. However, I would not be surprised if every now and then I get one or two records in a batch with a problem: perhaps a duplicate key, or a char field longer than expected, or a missing field in a non-null column. That type of thing.

When this happens, I would like all of the non-erroring rows to be inserted successfully and my code to somehow determine which row(s) failed.

I can do this by inserting each record in a separate transaction, but that would be very inefficient for the vast majority of cases, where all records are expected to insert cleanly.

What tools and strategies exist to accomplish these goals?

Nevo
  • 752
  • 2
  • 9
  • 22
  • Using [SAVEPOINT](https://www.postgresql.org/docs/current/sql-savepoint.html) within a plpgsql loop with exception handling will help in your case. Have a look at the answer of @LaurenzAlbe to [this](https://stackoverflow.com/questions/42638907/savepoint-in-postgresql-function) thread. – Stefanov.sm Oct 08 '21 at 17:12
  • With the exception of duplicates, you can validate the data before inserting. For duplicates use an [`on conflict` clause](https://www.postgresql.org/docs/13/sql-insert.html#SQL-ON-CONFLICT). – Schwern Oct 08 '21 at 19:44
  • @Schwern I am having in mind cases when incoming data needs non-trivial mapping into more than one table and may have various format and/or semantic faults. Hence the pattern - let it bang and then react. Of course this does not exclude your suggestion to validate before or while processing and mapping before insertion. – Stefanov.sm Oct 09 '21 at 08:46
  • @Stefanov.sm A framework like Rails would provide for such mapping and [validation](https://guides.rubyonrails.org/active_record_validations.html), plus [activerecord-import](https://github.com/zdennis/activerecord-import) to make bulk inserts with validation efficient. – Schwern Oct 09 '21 at 20:55
  • @Schwern There are many options of course. – Stefanov.sm Oct 10 '21 at 07:02

1 Answers1

0

I would suggest two options.

A. Here is some pseudocode in the logic tier, I hope clear.

open the data file;
issue 'BEGIN TANSACTION' to the DB;
while (there are lines left to read from the data file)
{
 read a line;
 try 
 {
   issue 'SAVEPOINT <name>' to the DB;
   process and insert the line into the target DB tables;  // This may bang
   issue 'RELEASE SAVEPOINT <name>' to the DB;
 }
 catch (exception ex)
 {
   issue 'ROLLBACK TO SAVEPOINT <name>' to the DB;
   handle the unfortunate event;
   log information about the failing line;
 }
}
issue 'COMMIT TANSACTION' to the DB;
close the data file;

Meanwhile if something catastrophic happens then you can issue ROLLBACK TANSACTION to the DB as usual.

B. Another approach would be to do the same in a plpgsql function. I have done this some time ago and it worked very well:

  • Create a foreign table from the data file in pg_temp schema (i.e. a temporary foreign table so that the solution will be concurrent);
  • The foreign table should have a single column of type text;
  • Then read line by line in an implicit cursor loop.

Some more plpgsql-like pseudocode.

for running_line in select the_line_column from pg_temp.the_foreign_table loop
 begin
  -- process and insert running_line into the target tables;
 exception when others then
  -- handle the unfortunate event;
 end;
end loop;

Both worked well. The latter is very fast.

Stefanov.sm
  • 11,215
  • 2
  • 21
  • 21