2

With the setting SHOW_INITIAL_ROWS = TRUE, we created a stream on top of a view (which has many joins). We created a Stored procedure with a single merge statement that ingests all of the data from the stream into a target table. Following is the merge statement used by the stored procedure.

merge into target tgt
using
(
select id,fname,metadata$action,metadata$isupdate
from emp_stream where not(metadata$action = 'DELETE' and metadata$isupdate = 'TRUE')
) src
on src.id = tgt.id
when matched and metadata$action = 'DELETE' and metadata$isupdate = 'FALSE' then delete

when matched and metadata$action = 'INSERT' and metadata$isupdate = 'TRUE' then update
set  tgt.id = src.id
     ,tgt.fname = src.fname

when not matched and metadata$action = 'INSERT' and metadata$isupdate = 'FALSE' then
insert (id,fname) values (src.id,src.fname);

A task was created to run the stored procedure for every 8 hours. It ran successfully for the first time, i.e. the full load, which inserts all of the records from the view into the target table. However, the second load failed due to a duplicate row error. When we queried the stream, we found two records with the same PK(i.e., id) but different metadata$rowids and metadata$actions of insert and delete, with metadata$isupdate set to false for each.

If it is an update, the metadata$isupdate parameter should be set to true which is not the case here. Could someone please assist us with this?

Trying to do Incremental load using Streams in snowflake but facing duplicate row error.

Greg Pavlik
  • 10,089
  • 2
  • 12
  • 29
  • Do you have any timestamp in the data itself which would determine which record is the latest? If so, you can use that along with a QUALIFY + ROW_NUM window function to only return the latest row – Dean Flinter Nov 17 '22 at 15:29
  • Can you share the get_ddl() on your stream? – Greg Pavlik Nov 17 '22 at 16:34
  • @GregPavlik create or replace stream EMP_STREAM on view EMP_VIEW; – shiva nagesh Nov 18 '22 at 03:41
  • 1
    @DeanFlinter Both records have the same timestamp. To clarify further, the PK for which we are having issues was updated long time ago (i.e., even before the creation of stream). The record was successfully inserted into the target table during the initial load. But somehow it is showing up in the stream in the second load as two different records. If the metadata$isupdate = 'TRUE' the deleted record would have been filtered out in the where clause. There are numerous such PKs for which we are experiencing this problem. – shiva nagesh Nov 18 '22 at 08:20
  • Perhaps this is what you are seeing? https://docs.snowflake.com/en/sql-reference/sql/create-stream.html#output . Particularly this line: "Note that streams record the differences between two offsets. If a row is added and then updated in the current offset, the delta change is a new row. The METADATA$ISUPDATE row records a FALSE value." – Dean Flinter Nov 18 '22 at 13:03
  • Thanks for the response @DeanFlinter. But the records are not updated/modified recently. They were present in the table before we created stream on it. – shiva nagesh Nov 18 '22 at 15:10
  • If they are from the table before the stream was created, they should disappear from the stream after the initial load as the offset should move once the data has been consumed. Are you saying the data is still there after the initial load? – Dean Flinter Nov 21 '22 at 16:28
  • @DeanFlinter No. The stream is empty after the initial load. But somehow those records are showing up in the second load. – shiva nagesh Nov 22 '22 at 05:57

0 Answers0