2

Consider this table:

create table entries (
  sequence_number integer
    default nextval('entries_sequence_number_seq')
    primary key,
  time timestamp default now()
);

This table is used as an append-only stream of changes. There may be other tables involved in writes, but as the last SQL statement in each transaction, we insert a row to this table. In other words, our transactions may be large and time-consuming, but eventually we write this row and commit immediately.

Now we want one or more consumers that can track changes as they are appended to this table:

  • Each consumer needs to loop at regular intervals to fetch the next batch of changes in roughly chronological order — in other words, the delta of new rows appended to entries since the last time the consumer polled.
  • The consumer always goes forward in time, never backwards.
  • Each consumer gets all the data. There's no need for selective distribution.
  • Order of consumption is not important. However, the consumer eventually must see all committed entries: If an in-flight transaction commits a new entry to the table, it must be picked up.
  • We’d like to minimize the possibility of ever seeing the same row twice, but we can tolerate it if it happens.

Conceptually:

select * from entries where sequence_number > :high_watermark

…where the high_watermark is the highest number seen by the consumer.

However, since nextval() is computed before commit time, you can get into a situation where there are gaps caused by in-flight transactions that haven’t committed yet. You may have a race condition happen like so:

  • Assume world starts at sequence number 0.
  • Writer A txn: Inserts, gets sequence number 1.
  • Writer B txn: Inserts, gets sequence number 2.
  • Writer B txn commits.
  • Newest sequence number is now 2.
  • The consumer does it select on > 0, finds entry with sequence number 2, sets it as high_watermark.
  • Writer A txn commits.
  • The consumer does it select on > 2, thus never sees the entry with sequence number 1.

The race condition is probably very small in the general case, but it’s still a possibility, and the probability of it occurring increases with the load of the system.

So far the best, but certainly not elegant, solution that comes to mind is to always select with time:

select * from entries
where sequence_number > :newest_sequence_number
or time >= :newest_timestamp

This should theoretically — modulo leap seconds and drifting clocks — guarantee that older entries are seen, at the expense of getting rows that appeared in the last batch. The consumer should want to maintain a hopefully small set of already-seen entries that it can ignore. Leap seconds and drifting clocks could be accounted for by padding the timestamp with some unscientific number of seconds. The downside is that it will be constantly reading a bunch of redundant rows. And it just feels a bit clunky and hand-wavy.

A slightly blunter, but more deterministic approach would be to maintain an unlogged table of pending events, and always delete from it as we read from it. This has two downsides: One is performance, obviously. The other is that since there may be any number of consumers, we would have to produce one event per consumer, which in turn means we have to identify the consumers by some sort of unique ID at event-emitting time, and of course garbage-collect unused events when a consumer no longer exists.

It strikes me that a better approach than an unlogged table would be to use LISTEN/NOTIFY, with the ID of the entry as a payload. This has the advantage of avoiding polling in the first place, although that's not a huge win, since the object of the consumer in this application is to wake up only now and then and reduce work on the system. On the other hand, the only major downside I can see is that there is a limit (albeit a large one) to the number of messages that can be in flight, and that transactions will begin to fail if a notification cannot happen. This might be a reasonable compromise, however.

At the same time, something in the back of my mind is telling me that there must be a mathematically more elegant way of doing this with even less work.

Alexander Staubo
  • 3,148
  • 2
  • 25
  • 22
  • http://stackoverflow.com/questions/6507475/job-queue-as-sql-table-with-multiple-consumers-postgresql –  Oct 06 '16 at 06:19
  • Not the same problem. I don't need or want to lock – each consumer is getting all the incoming data, and consumers do not need to mark anything as reserved or done or anything like that. – Alexander Staubo Oct 06 '16 at 15:45

1 Answers1

1

Your improved idea with WHERE time >= :newest_timestamp is subject to the same race condition, because there is no guarantee that the timestamps are in commit order. Processes go to sleep occasionally.

Add a boolean field consumed_n for each consumer which is initialized to FALSE. Consumer n then uses:

    UPDATE entries
       SET consumed_n = TRUE
       WHERE NOT consumed_n
       RETURNING sequence_number, time;

It helps to have partial indexes ON entries(1) WHERE NOT consumed_n.

If that takes up too much storage for your taste, use one bit(n) field with a bit for each consumer.

The consumers will lock each other out as long as the transaction that issues these statements remains open. So keep it short for good concurrency.

Laurenz Albe
  • 209,280
  • 17
  • 206
  • 263
  • Unfortunately, I suspect you missed the part where there are multiple consumers reading the same stream at different positions. Both of your solutions would require that each event is duplicated for each consumer. – Alexander Staubo Oct 07 '16 at 02:51
  • I see. I have improved the answer to match your case better. – Laurenz Albe Oct 07 '16 at 07:33
  • Thanks. But there might be hundreds of concurrent consumers, and they stop/start at arbitrary times. A column per consumer is not feasible, unfortunately. It would also be very hard to track their identity; the only realistic way would be a unique, automatically generated ID per consumer and use something like a string array column. It's not going to work well. The information about what consumer has consumed what is also not interesting to the system past those few seconds where a race can happen. – Alexander Staubo Oct 07 '16 at 17:16
  • I understand. I am afraid that without some kind of marker for the already consumed entries, there is no safe way for a consumer to tell which entries are new. You might remember the processed rows that are not older than two seconds and look for entries not older than the oldest entry you have remembered and that you haven't seen yet, hoping that no transaction will commit more than two seconds after the timestamp it has entered. That's somewhat clumsy and arbitrary, and I have seen commits take longer than two seconds on systems with high I/O load, but I don't think you could do better. – Laurenz Albe Oct 08 '16 at 18:42