3

Initially our flow of cimmunicating with google Pub/Sub was so:

  1. Application accepts message
  2. Checks that it doesn't exist in idempotencyStore
  3. 3.1 If doesn't exist - put it into idempotency store (key is a value of unique header, value is a current timestamp)
    3.2 If exist - just ignore this message
  4. When processing is finished - send acknowledge
  5. In the acknowledge successfull callback - remove this msg from metadatastore

The point 5 is wrong because theoretically we can get duplicated message even after message has processed. Moreover we found out that sometimes message might not be removed even although successful callback was invoked( Message is received from Google Pub/Sub subscription again and again after acknowledge[Heisenbug]) So we decided to update value after message is proccessed and replace timestamp with "FiNISHED" string

But sooner or later we will encounter that this table will be overcrowded. So we have to cleanup messages in the MetaDataStore. We can remove messages which are processed and they were processed more 1 day.

As was mentioned in the comments of https://stackoverflow.com/a/51845202/2674303 I can add additional column in the metadataStore table where I could mark if message is processed. It is not a problem at all. But how can I use this flag in the my cleaner? MetadataStore has only key and value

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
gstackoverflow
  • 36,709
  • 117
  • 359
  • 710

1 Answers1

1

In the acknowledge successfull callback - remove this msg from metadatastore

I don't see a reason in this step at all.

Since you say that you store in the value a timestamp that means that you can analyze this table from time to time to remove definitely old entries.

In some my project we have a daily job in DB to archive a table for better main process performance. Right, just because we don't need old data any more. For this reason we definitely check some timestamp in the raw to determine if that should go into archive or not. I wouldn't remove data immediately after process just because there is a chance for redelivery from external system.

On the other hand for better performance I would add extra indexed column with timestamp type into that metadata table and would populate a value via trigger on each update or instert. Well, MetadataStore just insert an entry from the MetadataStoreSelector:

return this.metadataStore.putIfAbsent(key, value) == null;

So, you need an on_insert trigger to populate that date column. This way you will know in the end of day if you need to remove an entry or not.

Artem Bilan
  • 113,505
  • 11
  • 91
  • 118
  • I need to fill in additional field not in the moment of processing. I need to fill in it in the moment when initial message was processed. Processing might take hour for example – gstackoverflow Dec 26 '19 at 16:53
  • How that *initial message* is related to idempotent receiver interceptor? Looks like you try to use this pattern wrong way. Your description in the begging is good, but now you talk about some *initial* and something else. That's confusing... – Artem Bilan Dec 26 '19 at 16:56
  • Moreover if valueStrategy is null value will be timestamp – gstackoverflow Dec 26 '19 at 16:56
  • I think you understood me in a wrong way. I need to fill in addtiotional column after step 4 from my description when ack callback is invoked – gstackoverflow Dec 26 '19 at 16:58
  • Initial message - just every message from subscription we read – gstackoverflow Dec 26 '19 at 16:59
  • Why do you need that `addtiotional column` at all? Why a regular `value` with `timestamp` is not enough for you? – Artem Bilan Dec 26 '19 at 17:00
  • Because I want to rely on moment when the message was processed instead of moment of accepting – gstackoverflow Dec 26 '19 at 17:01
  • Do you mean that it is OK to accept the same message when it was already processed? – Artem Bilan Dec 26 '19 at 17:05
  • I still don't understand what is the problem for you to add extra column and populate it from the trigger. Even if metadata store uses only two columns, you are OK to modify others with triggers or other processes. Since you said that you change the value to `FiNISHED`, you can check for this one in the trigger and update respective extra column for a *finished_date*. So, you cleaner really will know that the value is old enough for removal. – Artem Bilan Dec 26 '19 at 17:09
  • And you can populate that `finished_date` column from your update process when you place that `FiNISHED` into the value. The `MetadataStore` doesn't care about the `VALUE` when it checks before inserting. – Artem Bilan Dec 26 '19 at 17:16
  • I've already implemented populating additional column in the successfull callback and it works. – gstackoverflow Dec 26 '19 at 17:17
  • 1
    Good. Doesn't mean you can accept my answer already and we are good to go to celebrate New Year? ;-) – Artem Bilan Dec 26 '19 at 17:19
  • 1
    **Do you mean that it is OK to accept the same message when it was already processed?** it is absolutely wrong but sometimes it happens(https://stackoverflow.com/questions/59391264/message-is-received-from-google-pub-sub-subscription-again-and-again-after-ackno) – gstackoverflow Dec 26 '19 at 17:19
  • 1
    Well, I don't mean *redelivery*, my point was about a purpose of `MetadataStore`. I was confused that you would like to receive the same message after that `FINISHED` marker. But now it looks like an `IdempotentReceiver` pattern is your saver and you have a solution to clean up a table from time to time. – Artem Bilan Dec 26 '19 at 17:21