0

I have a lot of data which is in form of list of dictionaries. I want to insert all the data into the snowflake table.

The primary key on the table is ID, i can receive new data for which there is already an id present then I would need to update the data. What I have done till now is since the data is large I have inserted the batch data into temporary table and the from temporary table I have used merge query to update/insert in main table.

    def batch_data(data, chunk_size):
        for i in range(0, len(data), chunk_size):
            yield data[i:i + chunk_size]

    def upsert_user_data(self, user_data):
        columns = ["\"" + x + "\"" for x in user_data[0].keys()]
        values = ['?' for _ in user_data[0].keys()]
    
        for chunk in batch_data(user_data, 1000):

            sql = f"INSERT INTO TEMP ({','.join(columns)}) VALUES ({','.join(values)});"
            print(sql)
            data_to_load = [[x for x in i.values()] for i in chunk]
            snowflake_client.run(sql, tuple(data_to_load))

        sql = "MERGE INTO USER USING (SELECT ID AS TID, NAME AS TNAME, STATUS AS TSTATUS FROM TEMP) AS TEMPTABLE" \
        "ON USER.ID = TEMPTABLE.TID WHEN MATCHED THEN UPDATE SET USER.NAME = TEMPTABLE.TNAME, USER.STATUS = TEMPTABLE.TSTATUS " \
        "WHEN NOT MATCHED THEN INSERT (ID, NAME, STATUS) VALUES (TEMPTABLE.TID, TEMPTABLE.TNAME, TEMPTABLE.TSTATUS);"

        snowflake_client.run(sql)

Is there any way I can remove temporary table and use only merge query in batch way?

Avenger
  • 793
  • 11
  • 31
  • Try batch approach often used in hive (union all+row_number of full join and reload all the table): https://stackoverflow.com/a/44755825/2700344 – leftjoin May 28 '21 at 08:37
  • I'm not sure I understand: Why do you need the TEMP table? Are you importing data by running multiple INSERTs? Have you considered just importing them as external tables/files? – Felipe Hoffa May 28 '21 at 19:20

0 Answers0