4

I understand that there is no direct UPSERT query one can perform directly from Glue to Redshift. Is it possible to implement the staging table concept within the glue script itself?

So my expectation is creating the staging table, merging it with destination table and finally deleting it. Can it be achieved within the Glue script?

John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Arpit Singh
  • 41
  • 1
  • 1
  • 4
  • If you already have a redshift cluster I would consider using spectrum in order to make glue’s external tables accessible from within redshift — I’ve used this to essentially eliminate the need for temporary “staging” tables in my workflows... I query the external table directly in my sql statement which perform upserts. I use either aws lambda or aws batch to execute the sql statements depending on how long running I expect it to be. – kinzleb Apr 09 '18 at 17:48
  • (https://stackoverflow.com/questions/48026111/aws-glue-truncate-redshift-table) probably a duplicate. – Ashutosh May 04 '18 at 17:27

3 Answers3

7

It is possible to implement upsert into Redshift using staging table in Glue by passing 'postactions' option to JDBC sink:

val destinationTable = "upsert_test"
val destination = s"dev_sandbox.${destinationTable}"
val staging = s"dev_sandbox.${destinationTable}_staging"

val fields = datasetDf.toDF().columns.mkString(",")

val postActions =
  s"""
     DELETE FROM $destination USING $staging AS S
        WHERE $destinationTable.id = S.id
          AND $destinationTable.date = S.date;
     INSERT INTO $destination ($fields) SELECT $fields FROM $staging;
     DROP TABLE IF EXISTS $staging
  """

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "postactions" -> postActions
  )),
  redshiftTmpDir = s"$tempDir/redshift",
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)

Make sure the user used for writing to Redshift has sufficient permissions to create/drop tables in the staging schema.

Yuriy Bondaruk
  • 4,512
  • 2
  • 33
  • 49
4

Apparently connection_options dictionary parameter in glueContext.write_dynamic_frame.from_jdbc_conf function has 2 interesting parameters: preactions and postactions

target_table = "my_schema.my_table"
stage_table = "my_schema.#my_table_stage_table"


pre_query = """
    drop table if exists {stage_table};
    create table {stage_table} as select * from {target_table} LIMIT 0;""".format(stage_table=stage_table, target_table=target_table)

post_query = """
    begin;
    delete from {target_table} using {stage_table} where {stage_table}.id = {target_table}.id ; 
    insert into {target_table} select * from {stage_table}; 
    drop table {stage_table}; 
    end;""".format(stage_table=stage_table, target_table=target_table)
    
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(
    frame = datasource0, catalog_connection ="test_red", redshift_tmp_dir='s3://s3path', transformation_ctx="datasink4",
    connection_options = {"preactions": pre_query, "postactions": post_query, 
                          "dbtable": stage_table, "database": "redshiftdb"})

Based on https://aws.amazon.com/premiumsupport/knowledge-center/sql-commands-redshift-glue-job/

Vzzarr
  • 4,600
  • 2
  • 43
  • 80
  • if you use this as is, it doesn't actually work. the stage_table will be created before the prequery is run by the frame writer. So this is first writing to stage_table, then drops it, then recreates it like target_table (which, if it doesn't exist it will error btw) – Radu Simionescu Jun 02 '20 at 08:26
3

Yes, it can be totally achievable. All you would need is to import pg8000 module into your glue job. pg8000 module is the python library which is used to make connection with Amazon Redshift and execute SQL queries through cursor. Python Module Reference: https://github.com/mfenniak/pg8000 Then, make connection to your target cluster through pg8000.connect(user='user',database='dbname',host='hosturl',port=5439,password='urpasswrd') And use the Glue,s datasink option to load into staging table and then run upsert sql query using pg8000 cursor

>>> import pg8000
>>> conn = pg8000.connect(user='user',database='dbname',host='hosturl',port=5439,password='urpasswrd')
>>> cursor = conn.cursor()
>>> cursor.execute("CREATE TEMPORARY TABLE book (id SERIAL, title TEXT)")
>>> cursor.execute("INSERT INTO TABLE final_target"))
>>> conn.commit()

You would need to zip the pg8000 package and put it in s3 bucket and reference it to the Python Libraries path under the Advanced options/Job parameters at Glue Job section.

winnervc
  • 799
  • 7
  • 10