I have a lot of data which is in form of list of dictionaries. I want to insert all the data into the snowflake table.
The primary key on the table is ID, i can receive new data for which there is already an id present then I would need to update the data. What I have done till now is since the data is large I have inserted the batch data into temporary table and the from temporary table I have used merge query to update/insert in main table.
def batch_data(data, chunk_size):
for i in range(0, len(data), chunk_size):
yield data[i:i + chunk_size]
def upsert_user_data(self, user_data):
columns = ["\"" + x + "\"" for x in user_data[0].keys()]
values = ['?' for _ in user_data[0].keys()]
for chunk in batch_data(user_data, 1000):
sql = f"INSERT INTO TEMP ({','.join(columns)}) VALUES ({','.join(values)});"
print(sql)
data_to_load = [[x for x in i.values()] for i in chunk]
snowflake_client.run(sql, tuple(data_to_load))
sql = "MERGE INTO USER USING (SELECT ID AS TID, NAME AS TNAME, STATUS AS TSTATUS FROM TEMP) AS TEMPTABLE" \
"ON USER.ID = TEMPTABLE.TID WHEN MATCHED THEN UPDATE SET USER.NAME = TEMPTABLE.TNAME, USER.STATUS = TEMPTABLE.TSTATUS " \
"WHEN NOT MATCHED THEN INSERT (ID, NAME, STATUS) VALUES (TEMPTABLE.TID, TEMPTABLE.TNAME, TEMPTABLE.TSTATUS);"
snowflake_client.run(sql)
Is there any way I can remove temporary table and use only merge query in batch way?