1

I would like to load data from MongoDB incrementally to Azure storage using Azure data factory. I couldn't find any relavent documentation to do this..Appreicate if there is a way to achieve this with Azure data factory. I have already checked the below links but none of them talk about loading data incrementally from mongoDB.

https://learn.microsoft.com/en-us/azure/data-factory/connector-mongodb?tabs=data-factory

Load data from mongodb incrementally.

Suresh
  • 9
  • 3
  • Please provide enough code so others can better understand or reproduce the problem. – Jason J. Nathan Jul 11 '23 at 11:37
  • We cannot incrementally add data to a file in azure storage. We can create a new file (filename will contain the date at which it got created) for each load in azure blob storage under the same folder. – Aswin Jul 12 '23 at 06:31
  • Thanks aswin. do you have any relavent articles/links which you can forward pls. – Suresh Jul 12 '23 at 08:36
  • You can check this [answer](https://stackoverflow.com/a/74428748/19986107). In this answer Azure SQL DB is used as a sink. – Aswin Jul 12 '23 at 08:42
  • Thanks a lot Aswin for the response. In the copy activity for the source, we are getting an error when trying this expression and it says its 'Invalid'. On mongo, 'Created_date' is a string in json document and lookup field(watermark) is a datetime in azureSQL. how do we get around this issue? appreicate all your help. {"created_date":{$gt:@{activity('Lookup1').output.firstRow.date_col}} – Suresh Jul 18 '23 at 02:07
  • Could you share the exact error message,? perhaps with screenshot. – Aswin Jul 18 '23 at 03:18

1 Answers1

1

In order to copy data from azure cosmos dB - mongo dB Api to azure blob storage incrementally, you need to maintain the watermark table. This table will have the timestamp value of the last pipeline run. If you store the watermark table in mongo dB API, ADF doesn't have option to query it. Thus, I am taking the watermark table in azure SQL database to store this.

enter image description here

Initially, value is stored as 1900-01-01 in watermark table. Steps to be followed in ADF:

  • Take the lookup activity with the dataset pointing to watermark table. Give the query as select date_col from watermark

enter image description here

  • Then take the copy activity next to lookup activity. Take the source dataset for mongo DB. In filter, type the following to filter the rows greater than watermark table value.
{"created_date":{$gt:@{activity('Lookup1').output.firstRow.date_col}}
  • Create a sink dataset and keep the filename as dynamic. @concat('filename_',utcnow()). This will concat the filename with the datetime at when it got created.

enter image description here

  • Take the script activity after copy activity and give the query as,
update watermark
set date_col='@{utcNow()}'

This will update the watermark table with current UTC. Thus, in next pipeline run, any rows which are created after the current UTC in mongo dB API, that will be copied to new file.

Aswin
  • 4,090
  • 2
  • 4
  • 16