2

I'm learning AWS Glue. With traditional ETL a common pattern is to look up the primary key from the destination table to decide if you need to do an update or an insert (aka upsert design pattern). With glue there doesn't seem to be that same control. Plain writing out the dynamic frame is just a insert process. There are two design patterns I can think of how to solve this:

  1. Load the destination as data frame and in spark, left outer join to only insert new rows (how would you update rows if you needed to? delete then insert??? Since I'm new to spark this is most foreign to me)
  2. Load the data into a stage table and then use SQL to perform the final merge

It's this second method that I'm exploring first. How can I in the AWS world execute a SQL script or stored procedure once the AWS Glue job is complete? Do you do a python-shell job, lambda, directly part of glue, some other way?

Michael Black
  • 661
  • 11
  • 24
  • 1
    In which DB are you writing you data (AWS Redshift, RDS)? – Yuriy Bondaruk Apr 03 '19 at 00:20
  • On prem, MS SQL Server. I have the JDBC connection working and I can write my data frame to a table. So I know I have at least the networking and connection part figured out. – Michael Black Apr 03 '19 at 22:19
  • you can do this using py4j https://stackoverflow.com/questions/64775753/how-to-run-arbitrary-ddl-sql-statements-or-stored-procedures-using-aws-glue – mishkin Jan 21 '21 at 19:10

2 Answers2

2

I have used pymysql library as a zip file uploaded to AWS S3, and configured in the AWS Glue job parameters. And for UPSERTs, I have used INSERT INTO TABLE....ON DUPLICATE KEY.

So based on the primary key validations, the code would either update a record if already exists, or insert a new record. Hope this helps. Please refer this:

import pymysql

rds_host  = "rds.url.aaa.us-west-2.rds.amazonaws.com"
name = "username"
password = "userpwd"
db_name = "dbname"
conn = pymysql.connect(host=rds_host, user=name, passwd=password, db=db_name, connect_timeout=5)

with conn.cursor() as cur:
   insertQry="INSERT INTO ZIP_TERR(zip_code, territory_code, "
             "territory_name,state) "
             "VALUES(zip_code, territory_code, territory_name, state) "
             "ON DUPLICATE KEY UPDATE territory_name = "
             "VALUES(territory_name), state = VALUES(state);"
   cur.execute(insertQry)
   conn.commit()
   cur.close()

In the above code sample, territory-code, zip-code are primary keys. Please refer here as well: More on looping inserts using a for loops

Jaco Van Niekerk
  • 4,180
  • 2
  • 21
  • 48
Yuva
  • 2,831
  • 7
  • 36
  • 60
0

As always, AWS' changing feature list resolves much of these problems (arising from user demand and common work patterns).

AWS have published documentation on Updating and Inserting new data, using staging tables (which you mentioned in your second strategy).

Generally speaking the most rigorous approach for ETL is to truncate and reload source data, but this depends on your source data. If your source data is a time series dataset spanning billions of records, you may need to use a delta/incremental load pattern.

TheBaker
  • 83
  • 8