0

Here is the program code:

jobQueueManager.addSource = (source) => {
    console.log('addSource', 'source', source);

    return new P(
        (resolve) => {
            connection.beginTransaction((errorTransaction) => {
                if (errorTransaction) {
                    throw new Error('Error starting a transaction.');
                }

                connection.query('SELECT `id` FROM `source` WHERE `country_code` = ? AND `nid` = ?', [
                        source.countryCode,
                        source.nid
                    ], (errorSelect, rows) => {
                    if (errorSelect) {
                        console.log('errorSelect', errorSelect);

                        throw new Error('Error selecting a source.');
                    }

                    // This should be able to find the row after the second time .addSource has been called
                    // with the same data. But because second "select" happens before the first query is
                    // commited, it does not.
                    console.log('select source', 'rows', rows);

                    if (rows.length === 0) {
                        connection.query('INSERT INTO `source` SET ?', {
                            country_code: source.countryCode,
                            nid: source.nid
                        }, (errorInsert, result) => {
                            if (errorInsert) {
                                throw new Error('Error inserting a source.');
                            }

                            console.log('insert source', 'source', source);

                            resolve(result.insertId);
                        });
                    } else {
                        resolve(rows[0].id);
                    }
                });
            });
        })
        .then((sourceId) => {
            return new P((resolve) => {
                connection.commit((errorCommit) => {
                    console.log('commit source');
                    if (errorCommit) {
                        throw new Error('Error committing a transaction.');
                    }

                    resolve({
                        id: sourceId,
                        ...source
                    });
                });
            });
        })
        .tap((sourceEntity) => {
            console.log('sourceEntity', sourceEntity);
        });
};

Here are the instructions to execute the program:

Promise
    .all([
        jobQueueManager
            .addSource({
                countryCode: 'uk',
                nid: 'foo'
            }),
        jobQueueManager
            .addSource({
                countryCode: 'uk',
                nid: 'foo'
            })
        ,
        jobQueueManager
            .addSource({
                countryCode: 'uk',
                nid: 'foo'
            })
    ]);

Here is the output:

addSource source { countryCode: 'uk', nid: 'foo' }
addSource source { countryCode: 'uk', nid: 'foo' }
addSource source { countryCode: 'uk', nid: 'foo' }
select source rows []
select source rows []
select source rows []
insert source source { countryCode: 'uk', nid: 'foo' }
insert source source { countryCode: 'uk', nid: 'foo' }
insert source source { countryCode: 'uk', nid: 'foo' }
commit source
sourceEntity { id: 1, countryCode: 'uk', nid: 'foo' }
commit source
sourceEntity { id: 2, countryCode: 'uk', nid: 'foo' }
commit source
sourceEntity { id: 3, countryCode: 'uk', nid: 'foo' }

I want the execution order to be:

  • select, insert;
  • select;
  • select;

but I am getting:

  • select;
  • select;
  • select;
  • insert;
  • insert;
  • insert;
Gajus
  • 69,002
  • 70
  • 275
  • 438
  • doesn't look like .all does what you need in this case. – Kevin B Nov 10 '15 at 16:15
  • 2
    Well don't call `addSource` thrice at once (which will let them run in parallel)? Use `then` to chain instead. – Bergi Nov 10 '15 at 16:17
  • You really really should [promisify at the lowest level](http://stackoverflow.com/q/22519784/1048572), which is the `connection` and its method in your case. Your `addSource` code is using horrible callback-style while it should embrace promises. And `throw`ing from async callbacks is an antipattern. – Bergi Nov 10 '15 at 16:18
  • @Bergi There isn't a bullet proof way to promisify a connection of node-mysql. There is an entire discussion about it here, https://github.com/felixge/node-mysql/issues/929 – Gajus Nov 10 '15 at 16:22
  • @GajusKuizinas: Well even if `Promise.promisifyAll` doesn't work easily for some reason and you have to manually promisify, you still should promisify the single methods. Your `addSource` function lacks any abstraction. – Bergi Nov 10 '15 at 16:29
  • @Bergi What is the correct way to promisify node-mysql? – Gajus Nov 10 '15 at 16:32
  • @GajusKuizinas: I don't know, I didn't look close enough, though Petka's advise to use `Promise.promisifyAll` is sound. If there are any problems with that, you might want to ask a separate question. All I'm saying is that the promisification should not happen inside `addSource` (and would need to be repeated in every other method that uses the `connection`), but that it should be abstracted out and `addSource` should only call functions that return promises. – Bergi Nov 10 '15 at 16:36

1 Answers1

0

The only way I have managed to do this is using an execution queue, e.g.

jobQueueManager.addSource = (source) => {
    let newSource;

    newSource = P.all(jobQueueManager.addSource.executionQueue).then(() => {
        return new P(
            (resolve) => {
                // ...
            });
    });

    jobQueueManager.addSource.executionQueue.push(newSource);

    return newSource;
};

jobQueueManager.addSource.executionQueue = [];

This produces the expected behaviour:

select source rows []
insert source source { countryCode: 'uk', nid: 'foo' }
commit source
sourceEntity { id: 1, countryCode: 'uk', nid: 'foo' }
select source rows [ RowDataPacket { id: 1 } ]
commit source
sourceEntity { id: 1, countryCode: 'uk', nid: 'foo' }
select source rows [ RowDataPacket { id: 1 } ]
commit source
sourceEntity { id: 1, countryCode: 'uk', nid: 'foo' }
Gajus
  • 69,002
  • 70
  • 275
  • 438
  • That'll leak memory, holding the promises for all results of every call forever in that array. – Bergi Nov 10 '15 at 16:20
  • @Bergi If thats the only concern, just add Just add `executionQueue = executionQueue.fitler((promise) => { return !promise.isFulfilled(); });`. – Gajus Nov 10 '15 at 16:23
  • No, just the most obvious concern :-) Another is that any exception will reject your queue and you never can execute another `addSource` again. There's just too many edge cases to consider. – Bergi Nov 10 '15 at 16:31
  • Why does the `addSource` need to serialise the calls itself at all, and forbid any parallel execution? Shouldn't it the responsibility of the caller to sequence his calls as he requires? – Bergi Nov 10 '15 at 16:32
  • @Bergi That is the essence of the question. No, I don't think it is responsibility of the caller (caller needs not need to be aware of the implementation). – Gajus Nov 10 '15 at 16:48
  • I am trying to understand why execution order is not "1, 2, 3", when I do not enforce sequence execution. Here is non-enforced code and output, https://gist.github.com/gajus/dc6476c2d07fab2f295e#file-program1-js-L8. Here is with enforced sequence, https://gist.github.com/gajus/c27f0c6c6102f8ac1987#file-program-js-L8. (I have rewritten code to use promises.) I am expecting the latter behaviour. – Gajus Nov 10 '15 at 16:48
  • Why would it be "1, 2, 3"? Your function is asynchronous, and your calling it multiple times, so their execution can be interleaved arbitrarily. Don't you want your processing to be concurrent? – Bergi Nov 10 '15 at 16:52
  • No. I want my "transaction" block to restrict starting of another transaction before previous transaction has not complete. Thats a very sane requirement. – Gajus Nov 10 '15 at 16:57
  • Shouldn't you let the database deal with that? – Bergi Nov 10 '15 at 16:58
  • MySQL does not support nested transactions. It is up to the program to enforce them. Ideally, it should be `node-mysql` that does that. It doesn't. – Gajus Nov 10 '15 at 17:04
  • OK, but at least the queue should be bound to the "outer" transaction (in which the sequence needs to be enforced) then, instead of being bound to the static global `addSource` function which might be invoked from multiple points. – Bergi Nov 10 '15 at 17:11
  • Not following. How would that ensure that transaction is enforced within the same connection thread? – Gajus Nov 10 '15 at 17:27
  • Sorry, maybe I oversimplified. You still have working non-nested transactions with mysql, right? And your application only needs to ensure the correct sequence if followed on this per-transaction level? – Bergi Nov 10 '15 at 17:33
  • The second question is not clear, but I am sensing the answer is – yes. – Gajus Nov 10 '15 at 17:34
  • I mean that when your program that calls `addSource` multiple time is started within a transaction, you need to ensure the order of operations between these multiple calls. But when your program is started multiple times, from multiple opened transactions, there is no requirement on the order of operations between those transactions? – Bergi Nov 10 '15 at 17:42
  • 1
    By the way, I have managed to move transaction enforcing logic to mysql library level, https://gist.github.com/gajus/31ae5c90f24e920314cf. This makes the program code a lot more intuitive https://gist.github.com/gajus/31ae5c90f24e920314cf#file-program-example-js – Gajus Nov 10 '15 at 17:57
  • If other people are following this discussion, this is what happens if you issue a sequence of `START TRANSACTION` statements. http://dev.mysql.com/doc/refman/5.6/en/implicit-commit.html (An implicit commit.) – Gajus Nov 10 '15 at 18:01
  • I have proposed a variation of the above implementation to `node-mysql2-promise`. See discussion at https://github.com/namshi/node-mysql2-promise/issues/6. – Gajus Nov 10 '15 at 18:20