43

I have a typical producer-consumer problem:

Multiple producer applications write job requests to a job-table on a PostgreSQL database.

The job requests have a state field that starts contains QUEUED on creation.

There are multiple consumer applications that are notified by a rule when a producer inserts a new record:

CREATE OR REPLACE RULE "jobrecord.added" AS
  ON INSERT TO jobrecord DO 
  NOTIFY "jobrecordAdded";

They will try to reserve a new record by setting its state to RESERVED. Of course, only one consumer should succeed. All other consumers should not be able to reserve the same record. They should instead reserve other records with state=QUEUED.

Example: some producer added the following records to table jobrecord:

id state  owner  payload
------------------------
1 QUEUED null   <data>
2 QUEUED null   <data>
3 QUEUED null   <data>
4 QUEUED null   <data>

now, two consumers A, B want to process them. They start running at the same time. One should reserve id 1, the other one should reserve id 2, then the first one who finishes should reserve id 3 and so on..

In a pure multithreaded world, I would use a mutex to control access to the job queue, but the consumers are different processes that may run on different machines. They only access the same database, so all synchronization must happen through the database.

I read a lot of documentation about concurrent access and locking in PostgreSQL, e.g. http://www.postgresql.org/docs/9.0/interactive/explicit-locking.html Select unlocked row in Postgresql PostgreSQL and locking

From these topics, I learned, that the following SQL statement should do what I need:

UPDATE jobrecord
  SET owner= :owner, state = :reserved 
  WHERE id = ( 
     SELECT id from jobrecord WHERE state = :queued 
        ORDER BY id  LIMIT 1 
     ) 
  RETURNING id;  // will only return an id when they reserved it successfully

Unfortunately, when I run this in multiple consumer processes, in about 50% of the time, they still reserve the same record, both processing it and one overwriting the changes of the other.

What am I missing? How do I have to write the SQL statement so that multiple consumers will not reserve the same record?

G S
  • 35,511
  • 22
  • 84
  • 118
code_talker
  • 531
  • 1
  • 5
  • 5
  • you do that in a transactiuon, isn't it? with a BEGIN and a COMMIT? – regilero Jun 28 '11 at 17:01
  • No, without BEGIN and COMMIT. I tried to do it in a transaction, but then just nothing happens, my consumers do not process anything, the table remains unchanged. Should I use a transaction? – code_talker Jun 29 '11 at 08:09
  • It appears, that the problem with the transaction was the way I used it in my consumer application (which is written in Qt). – code_talker Jun 29 '11 at 16:20
  • ok saw your answer below, maybe you could make an edit with the final working code as this is a quite recurrent question – regilero Jun 29 '11 at 16:29
  • If when you did it inside of a transaction but *nothing* ever happened, my guess is that you didn't commit as the tx would rollback in that case. – apinstein Jul 15 '11 at 02:48

7 Answers7

40

I use postgres for a FIFO queue as well. I originally used ACCESS EXCLUSIVE, which yields correct results in high concurrency, but has the unfortunate effect of being mutually exclusive with pg_dump, which acquires a ACCESS SHARE lock during its execution. This causes my next() function to lock for a very long time (the duration of the pg_dump). This was not acceptable since we are a 24x7 shop and customers didn't like the dead time on the queue in the middle of the night.

I figured there must be a less-restrictive lock which would still be concurrent-safe and not lock while pg_dump is running. My search led me to this SO post.

Then I did some research.

The following modes are sufficient for a FIFO queue NEXT() function which will update the status of a job from queued to running without any concurrency fail, and also not block against pg_dump:

SHARE UPDATE EXCLUSIVE
SHARE ROW EXCLUSIVE
EXCLUSIVE

Query:

begin;
lock table tx_test_queue in exclusive mode;
update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1
    )
returning job_id;
commit;

Result looks like:

UPDATE 1
 job_id
--------
     98
(1 row)

Here is a shell script which tests all of the different lock mode at high concurrency (30).

#!/bin/bash
# RESULTS, feel free to repro yourself
#
# noLock                    FAIL
# accessShare               FAIL
# rowShare                  FAIL
# rowExclusive              FAIL
# shareUpdateExclusive      SUCCESS
# share                     FAIL+DEADLOCKS
# shareRowExclusive         SUCCESS
# exclusive                 SUCCESS
# accessExclusive           SUCCESS, but LOCKS against pg_dump

#config
strategy="exclusive"

db=postgres
dbuser=postgres
queuecount=100
concurrency=30

# code
psql84 -t -U $dbuser $db -c "create table tx_test_queue (job_id serial, status text);"
# empty queue
psql84 -t -U $dbuser $db -c "truncate tx_test_queue;";
echo "Simulating 10 second pg_dump with ACCESS SHARE"
psql84 -t -U $dbuser $db -c "lock table tx_test_queue in ACCESS SHARE mode; select pg_sleep(10); select 'pg_dump finished...'" &

echo "Starting workers..."
# queue $queuecount items
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -q -U $dbuser $db -c "insert into tx_test_queue (status) values ('queued');"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
# process $queuecount w/concurrency of $concurrency
case $strategy in
    "noLock")               strategySql="update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessShare")          strategySql="lock table tx_test_queue in ACCESS SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowShare")             strategySql="lock table tx_test_queue in ROW SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "rowExclusive")         strategySql="lock table tx_test_queue in ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareUpdateExclusive") strategySql="lock table tx_test_queue in SHARE UPDATE EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "share")                strategySql="lock table tx_test_queue in SHARE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "shareRowExclusive")    strategySql="lock table tx_test_queue in SHARE ROW EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "exclusive")            strategySql="lock table tx_test_queue in EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    "accessExclusive")      strategySql="lock table tx_test_queue in ACCESS EXCLUSIVE mode; update tx_test_queue set status='running{}' where job_id in (select job_id from tx_test_queue where status='queued' order by job_id asc limit 1);";;
    *) echo "Unknown strategy $strategy";;
esac
echo $strategySql
seq $queuecount | xargs -n 1 -P $concurrency -I {} psql84 -U $dbuser $db -c "$strategySql"
#psql84 -t -U $dbuser $db -c "select * from tx_test_queue order by job_id;"
psql84 -U $dbuser $db -c "select count(distinct(status)) as should_output_100 from tx_test_queue;"
psql84 -t -U $dbuser $db -c "drop table tx_test_queue;";

Code is here as well if you want to edit: https://gist.github.com/1083936

I am updating my application to use the EXCLUSIVE mode since it's the most restrictive mode that a) is correct and b) doesn't conflict with pg_dump. I chose the most restrictive since it seems the least risky in terms of changing the app from ACCESS EXCLUSIVE without being an uber-expert in postgres locking.

I feel pretty comfortable with my test rig and with the general ideas behind the answer. I hope that sharing this helps solve this problem for others.

apinstein
  • 5,105
  • 1
  • 23
  • 22
  • I've done a ton more testing on this solution and am 100% convinced it is the correct way to do this. I have deployed it to my production system (dequeue using EXCLUSIVE mode). Enjoy! – apinstein Jul 16 '11 at 22:12
  • 2
    Shouldn't require a table lock only a row lock: http://stackoverflow.com/a/30315387/492548 – mackross May 19 '15 at 01:43
24

No need to do a whole table lock for this :\.

A row lock created with for update works just fine.

See https://gist.github.com/mackross/a49b72ad8d24f7cefc32 for the change I made to apinstein's answer and verified that it still works.

Final code is

update 
    tx_test_queue
set 
    status='running'
where
    job_id in (
        select
            job_id
        from
            tx_test_queue
        where
            status='queued'
        order by 
            job_id asc
        limit 1 for update
    )
returning job_id;
mackross
  • 2,234
  • 17
  • 19
  • This solution explicitly removes the transaction. Why? – Yitz Sep 25 '16 at 14:40
  • pg implicitly adds a transaction around the statement – mackross Sep 26 '16 at 19:12
  • 3
    @mackross It would be better to use `for update skip locked` instead of just `for update` because it will not block other concurrent readers. Perhaps the reason you didn't use it is that the `skip locked` statement wasn't available in 2015. – G S Apr 30 '21 at 05:31
8

what about just select?

SELECT * FROM table WHERE status = 'QUEUED' LIMIT 10 FOR UPDATE SKIP LOCKED;

https://www.postgresql.org/docs/9.5/static/sql-select.html#SQL-FOR-UPDATE-SHARE

questionaire
  • 2,475
  • 2
  • 14
  • 28
6

Read my post here:

https://stackoverflow.com/a/6500830/32688

If you use transaction and LOCK TABLE you will have no problems.

G S
  • 35,511
  • 22
  • 84
  • 118
jordani
  • 436
  • 3
  • 3
  • 1
    Thank you, adding the table lock `LOCK TABLE slots IN ACCESS EXCLUSIVE MODE;` (and fixing my transaction statements in Qt) was the solution. Now, it is working as expected. Thank you! – code_talker Jun 29 '11 at 16:21
  • 1
    ACCESS EXCLUSIVE will work, however it has serious performance side effects. Notably, it will block during a pg_dump, which uses [ACCESS SHARE](http://www.postgresql.org/docs/8.4/static/explicit-locking.html) mode. This means your queue will block for the length your backup takes, in our case that can be a couple of hours. – apinstein Jul 15 '11 at 02:46
2

You might want to look at how queue_classic does it. https://github.com/ryandotsmith/queue_classic

The code is pretty short and easy to understand.

Joe Van Dyk
  • 6,828
  • 8
  • 57
  • 73
-1

Okay, here is the solution that is working for me, based on the link from jordani. As some of my problems were in the way Qt-SQL works, I've included the Qt code:

QSqlDatabase db = GetDatabase();
db.transaction();
QSqlQuery lockQuery(db);
bool lockResult = lockQuery.exec("LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; ");
QSqlQuery query(db);
query.prepare(    
"UPDATE jobrecord "
"  SET \"owner\"= :owner, state = :reserved "
"  WHERE id = ( "
"    SELECT id from jobrecord WHERE state = :queued ORDER BY id LIMIT 1 "
"  ) RETURNING id;"
);
query.bindValue(":owner", pid);
query.bindValue(":reserved", JobRESERVED);
query.bindValue(":queued", JobQUEUED); 
bool result = query.exec();

To check, if multiple consumers process the same job, I added a rule and a log-table:

CREATE TABLE serverjobrecord_log
(
  serverjobrecord_id integer,
  oldowner text,
  newowner text
) WITH ( OIDS=FALSE );


CREATE OR REPLACE RULE ownerrule AS ON UPDATE TO jobrecord
WHERE old.owner IS NOT NULL AND new.state = 1 
DO INSERT INTO jobrecord_log     (id, oldowner, newowner) 
    VALUES (new.id, old.owner, new.owner);

Without the LOCK TABLE serverjobrecord IN ACCESS EXCLUSIVE MODE; statement, the log-table fills occasionaly with entries, were one consumers has overwritten the values of another, but using the LOCK statement, the log-table remains empty :-)

code_talker
  • 531
  • 1
  • 5
  • 5
-2

Check out PgQ instead of reinventing the wheel.

Sean
  • 9,888
  • 4
  • 40
  • 43
  • 2
    I checked out PgQ already, but unfortunately, it will send the same event to all consumers: `You can have as many consumers as you want to on the same event queue, but they will all see the same events rather than share the workload.` This is exactly, what I do **not** want to happen. I would have preferred to use an existing solution, but couldn't find one that fulfills my requirements. – code_talker Jun 29 '11 at 07:57
  • Ah, see, I'd use PgQ then have each consumer register a lock on the work in a different table using an `UPDATE ... RETURNING` statement. Food for thought. – Sean Jun 30 '11 at 17:13