1

I have a Cassandra table which looks like this

CREATE TABLE tmp.inventory (
    t_id text,
    is_available boolean,
    modified_at bigint,
    price double,
    available_units bigint,
    PRIMARY KEY(t_id, modified_at)
) WITH CLUSTERING ORDER BY (modified_at);

I have a streaming pipeline which updates the items in Cassandra. The streaming pipeline is checkpointed at an interval. So when the pipeline fails, it will re-process the source data since last successful checkpoint. And when it re-processes after a failure, it will try to overwrite data in Cassandra which were successfully written already (i.e. after last successful checkpoint but before failure). I was thinking of leveraging the modified_at column to achieve this. Something like

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? AND modified_at < ?

This is me trying to do the update only if the modified_at in Cassandra is lesser than the modified_at in the pipeline. However, this throws InvalidQueryException: Slice restrictions are not supported on the clustering columns in UPDATE statements

I though IF condition can help in this case.

UPDATE tmp.inventory SET is_available = ? WHERE t_id = ? IF modified_at < ?

but this throws InvalidQueryException: PRIMARY KEY column 'modified_at' cannot have IF conditions

So what would be the ideal way to handle this?

Edit If I'm only having these fields in this table, then re-processing the events may not be that big of a problem, as it will eventually become consistent as and when the pipeline catches up to the live stream but say there is another streaming job which updates the same table with current price, available units, etc. In that case, if one of the job fails and re-starts, the table could be in an in-consistent state.

David Anderson
  • 39,434
  • 4
  • 33
  • 60
Sivaprasanna Sethuraman
  • 4,014
  • 5
  • 31
  • 60
  • are you setting `modified_at` based on the time of processing? Can you take it from the source data itself - in this case it won't matter if you write the same data again – Alex Ott Apr 24 '21 at 18:08
  • I'm taking it from the source data only. My input event looks like `{"ts": ..., "t_id" : "...", 'is_available": ...}`. I use the `ts` field for `modified_at` – Sivaprasanna Sethuraman Apr 24 '21 at 18:13
  • Then what is the problem? If you insert the same entry with the same primary key, you won’t create a visible duplicate - you’ll get duplicate on storage level, but it will be evicted by compaction – Alex Ott Apr 24 '21 at 18:25
  • I updated the question with new info. There will be another streaming job that will update other fields based on the primary key. – Sivaprasanna Sethuraman Apr 24 '21 at 18:29
  • Will the other job update the same entry? If yes, will it be different ts field? For example earlier? – Alex Ott Apr 24 '21 at 18:39
  • Yes. The other job will update the same entry (t_id) and it will most probably have different ts value. It can be received after Job #1 made a write but can have a value earlier than that (because of out of order data, in that case, I would like to ignore) but most probably it will carry a value later than what would have been sent in the Job #1. – Sivaprasanna Sethuraman Apr 24 '21 at 18:42
  • look into the answer – Alex Ott Apr 25 '21 at 10:48
  • Hi.. Sure, That seems possible. Let me try it out and update here. – Sivaprasanna Sethuraman Apr 25 '21 at 14:56

1 Answers1

1

To avoid the situation when one thread can write older data after another thread already inserted the newer data you can use the USING TIMESTAMP when doing INSERT or UPDATE (in Cassandra anything is UPSERT anyway, so using INSERT could be easier from syntax perspective, imho). The idea is that you explicitly specify the timestamp of the record, so when another thread inserts older data later than previous thread the data will be inserted, but they won't win because Cassandra uses timestamp (explicitly specified) to detect the latest version. Something like this:

INSERT INTO tmp.inventory (t_id, is_available, modified_at)
  VALUES (?, ?,?)
  USING TIMESTAMP <modified_at*1000>

The only thing to remember is that value specified in the USING TIMESTAMP uses the microseconds instead of milliseconds, and you need to calculate the value of <modified_at*1000> - you can't use expression there (here it's just for example).

Alex Ott
  • 80,552
  • 8
  • 87
  • 132