2

I am trying to load files from my Azure blob to Snowflake table incrementally. After which in snowflake, I put streams on that table and load the data to the target table.

I am unable to do incremental load from Azure to Snowflake. I have tried many ways but not working. I am attaching the images of my 2 different ways (pipelines) to do the same.

In this pipeline, I just added 3 extra columns which I wanted in my sink

In this pipeline, I tried creating conditional splits

Both of these have not worked out. Kindly suggest me how to go about this.

James Z
  • 12,209
  • 10
  • 24
  • 44
Coder1990
  • 89
  • 8
  • can you add some clarification to what you mean by "incremental", and where it's going wrong? You want to only load new files from the Azure Blob, but it's picking up all of them? You want to insert new records, but the job is truncating before it inserts? You want to load upserts into the table but it's only running inserts? – David Garrison Oct 13 '21 at 14:53
  • 1
    I want to load newly inserted records as well as any updated records from the Azure Blob to Snowflake table. And yes, it is only loading inserts. Let me know if you want more info – Coder1990 Oct 14 '21 at 06:14
  • Hello @Coder1990, was my answer helpful? – NiharikaMoola-MT Oct 18 '21 at 04:46

1 Answers1

4

You can achieve this by selecting Allow Upsert in sink settings under the Update method.

Below are my repro details:

  1. This is the staging table in snowflake which I am loading incremental data to.

enter image description here

  1. Source file – Incremental data

a) This file contains records that exist in the staging table (StateCode = ‘AK’ & ‘CA’), so these 2 records should be updated in the staging table with new values in Flag.

b) Other 2 records (StateCode = ‘FL’ & ‘AZ’) should be inserted into the staging table.

enter image description here

  1. Dataflow source settings:

enter image description here

enter image description here

  1. Adding DerivedColumn transformation to add a column modified_date which is not in the source file but in the sink table.

enter image description here

enter image description here

  1. Adding AlterRow transformation: When you are using the Upsert method, AlterRow transformation is a must to include the upsert condition.

a) In condition, you can mention to upsert the rows only when the unique column (StateCode in my case) is not null.

enter image description here

enter image description here

  1. Adding sink transformation to AlterRow with Snowflake stage table as sink dataset.

enter image description here

a) In sink settings, select Update method as Allow upsert and provide the Key (unique) column based on which the Upsert should happen in sink table.

enter image description here

enter image description here

  1. After you run the pipeline, you can see the results in a sink.

a) As StateCode AK & CA already exists in the table, only other column values (flag & modified_date) for those rows are updated.

b) StateCode AZ & FL are not found in the stage (sink) table so, these rows are inserted.

enter image description here

NiharikaMoola-MT
  • 4,700
  • 1
  • 3
  • 15
  • Hi Nihar, Thanks for your answer. I follow the same setup to implement the Upsert. I carefully checked the SQL statements executed in Snowflake for the ADF task. It seems 1. All rows in the source file will be loaded into Snowflake, but not only the updated rows. In your example, only the updated rows are loaded into Snowflake. Correct me if I am wrong. 2. Since all rows are loaded into Snowflake, the Update SQL statement will update all rows in Snowflake table. But the intention is only to update the delta rows. Do you have any idea on this? Thanks. – leon Apr 02 '22 at 05:56