1

I need a way to reject the pg-promise in this code:

db.tx(async t => {
        var file = await t.one(`insert into ui.user_datasets_files (user_dataset_id,filename) values (${itemId},'${fileName}') RETURNING id`);
        var data = rows.map(row => {
            return {
                user_dataset_id: itemId,
                file_id: file.id,
                json_data: JSON.stringify(row)
            };
        });
        const insert = pgPromise.helpers.insert(data, dataset_data_columns);
        return t.none(insert);

}).then(() => callback()).catch(err => callback(err));

this line takes long time and user can end the connection:

return t.none(insert);

so, i want a way to end the execution and make rollback inside this event:

 req.on('close', function () {
    promise.reject('Connection Closed');
});
vitaly-t
  • 24,279
  • 15
  • 116
  • 138
  • 1) How many rows of data are you trying to insert in a single operation? 2) `this line takes long time` - how long, be specific? – vitaly-t May 15 '18 at 11:30
  • oh, that's about thousands of rows – Akram Kamal Qassas May 15 '18 at 12:00
  • Do not use `callback` parameters (and [not like this](https://stackoverflow.com/q/24662289/1048572)!). Just return the promise from your function. – Bergi May 15 '18 at 12:54
  • `return Promise.race([t.none(insert), new Promise((resolve, reject) => req.on('close', reject))])` should do it. – Bergi May 15 '18 at 12:56
  • @Bergi This isn't gonna work, because the long insert will continue holding the IO open, and thus the process running, even though the transaction itself will stop. The insert itself needs to be paginated/throttled to prevent this. – vitaly-t May 15 '18 at 13:01
  • @vitaly-t Wouldn't the database kill all IO operations once the transaction ends? – Bergi May 15 '18 at 13:38
  • @Bergi There is a way to kill all IO operations in the process, but that hack is outside of the database transaction, and a nasty one at that. As far as the database IO is concerned, it cannot be released while there is an atomic-query operation running, which that insert exactly is. – vitaly-t May 15 '18 at 13:42

1 Answers1

1

Paginate/throttle through the inserts, and between each insert check if the transaction needs to be interrupted, and if so - throw an error, and the transaction will end.

So for example, instead of inserting 10,000 rows in a single insert, you can do 10 inserts of 1000 rows. It will execute just a tad slower, but will make your transaction interruptable without a large delay.

You can paginate through data either via sequence, as shown in Data Imports, or via a simple loop, if all data is in memory.

in my code, all rows are in memory, how can i paginate the inserts?

db.tx(async t => {
    while(/* there is data */) {

        // get the next set of rows from memory;

        const insert = pgPromise.helpers.insert(data, dataset_data_columns);

        await t.none(insert)
            .then(() => {
               if(/* need to interrupt */) {
                   throw new Error('Interrupting transaction');
               }
            });
    }

}).then().catch();
vitaly-t
  • 24,279
  • 15
  • 116
  • 138