30

Here are some bullet points in terms of how I have things setup:

  • I have CSV files uploaded to S3 and a Glue crawler setup to create the table and schema.
  • I have a Glue job setup that writes the data from the Glue table to our Amazon Redshift database using a JDBC connection. The Job also is in charge of mapping the columns and creating the redshift table.

By re-running a job, I am getting duplicate rows in redshift (as expected). However, is there way to replace or delete rows before inserting the new data, using a key or the partitions setup in glue?

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

from awsglue.dynamicframe import DynamicFrame
from awsglue.transforms import SelectFields

from pyspark.sql.functions import lit

## @params: [TempDir, JOB_NAME]
args = getResolvedOptions(sys.argv, ['TempDir','JOB_NAME'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

columnMapping = [
    ("id", "int", "id", "int"),
    ("name", "string", "name", "string"),
]

datasource1 = glueContext.create_dynamic_frame.from_catalog(database = "db01", table_name = "table01", transformation_ctx = "datasource0")

applymapping1 = ApplyMapping.apply(frame = datasource1, mappings = columnMapping, transformation_ctx = "applymapping1")
resolvechoice1 = ResolveChoice.apply(frame = applymapping1, choice = "make_cols", transformation_ctx = "resolvechoice1")
dropnullfields1 = DropNullFields.apply(frame = resolvechoice1, transformation_ctx = "dropnullfields1")
df1 = dropnullfields1.toDF()
data1 = df1.withColumn('platform', lit('test'))
data1 = DynamicFrame.fromDF(data1, glueContext, "data_tmp1")

## Write data to redshift
datasink1 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = data1, catalog_connection = "Test Connection", connection_options = {"dbtable": "table01", "database": "db01"}, redshift_tmp_dir = args["TempDir"], transformation_ctx = "datasink1")

job.commit()
Sam Firke
  • 21,571
  • 9
  • 87
  • 105
krchun
  • 994
  • 1
  • 9
  • 19
  • 2
    Good question, running into the same issue right now. Did you make any progress thus far? – Matthijs Sep 19 '17 at 15:09
  • 3
    I was in contact with AWS Glue Support and was able to get a work around. It does not appear glue has a way to do this, or was never meant for this type of work. The way I was able to get a working solution was to have glue insert all rows into a staging table and then perform a upsert/merge outside of glue. – krchun Sep 20 '17 at 15:16

6 Answers6

21

Job bookmarks are the key. Just edit the job and enable "Job bookmarks" and it won't process already processed data. Note that the job has to rerun once before it will detect it does not have to reprocess the old data again.

For more info see: http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

The name "bookmark" is a bit far fetched in my opinion. I would have never looked at it if I did not coincidentally stumble upon it during my search.

Matthijs
  • 1,315
  • 12
  • 12
  • 1
    I'm not really sure why you were down voted. Job bookmarking is the equivalent of checkpointing in spark which sounds like it would the problem. – Leyth G Nov 27 '17 at 19:02
  • 2
    I don't know exactly either. The only reason I can think of is that rerunning the same job (by clearing the bookmark for instance) might result in double records in Redshift because the batch is processed again. – Matthijs Nov 28 '17 at 11:52
  • 3
    Have you actually got it working? I know it is supposed to do what you said, but I couldn't get it working. I have a catalog table as input (created by a crawler over a Parquet data set in S3), a simple mapping step and Redshift as data sink. Job bookmark is enabled as default and all Job runs also have it enabled. Still duplicates all the data on every run. – andresp Feb 02 '18 at 15:45
  • 2
    Yes its working for me. I have a crawler that crawls every day. And a trigger (a couple of hours later so the crawler is finished) with option --job-bookmark-option: job-bookmark-enable. We did not use Parquet, not sure if that makes a difference. All in all my experience with Glue is not that great though: Jobs fail when too large, I could not get custom Python scripts working. We are looking for alternatives. – Matthijs Feb 05 '18 at 13:26
  • Indeed. Just tested with JSON as input and it works. I've reported the bug with Parquet to AWS. – andresp Feb 05 '18 at 19:17
  • Just got the confirmation from AWS. Job bookmarking is not supported with Parquet or ORC as input. There is a feature request for this but they can't provide an ETA. It's unfortunate that this is not mentioned in the Job bookmarking documentation. – andresp Feb 07 '18 at 10:40
  • Hey, somewhere I heard that the Job Bookmark only works in S3, we can't use it for databases(with JDBC). Because S3 files are immutable. So they can understand if the file get updated. Is my understand correct? – TheDataGuy Nov 13 '19 at 01:39
  • @Bhuvanesh not sure if I follow you there. If your source is JDBC then I'm pretty sure Job bookmarks won't work. – Matthijs Nov 13 '19 at 11:00
10

This was the solution I got from AWS Glue Support:

As you may know, although you can create primary keys, Redshift doesn't enforce uniqueness. Therefore, if you are rerunning Glue jobs then duplicate rows can get inserted. Some of the ways to maintain uniqueness are:

  1. Use a staging table to insert all rows and then perform a upsert/merge [1] into the main table, this has to be done outside of glue.

  2. Add another column in your redshift table [1], like an insert timestamp, to allow duplicate but to know which one came first or last and then delete the duplicate afterwards if you need to.

  3. Load the previously inserted data into dataframe and then compare the data to be insert to avoid inserting duplicates[3]

[1] - http://docs.aws.amazon.com/redshift/latest/dg/c_best-practices-upsert.html and http://www.silota.com/blog/amazon-redshift-upsert-support-staging-table-replace-rows/

[2] - https://github.com/databricks/spark-redshift/issues/238

[3] - https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html

Sandeep Singh
  • 432
  • 6
  • 17
krchun
  • 994
  • 1
  • 9
  • 19
  • Did you check the Job bookmarks? If your source is S3, it might be enough. If it is not working for you I'd like to know what issues you are running into, so I wont make the same mistakes? – Matthijs Sep 20 '17 at 20:33
  • 1
    I just tried using Glue where my data source and data destination are both in Amazon Redshift. Enabling bookmarks didn't help me. The data has duplicates. – deepak.prathapani Aug 05 '19 at 19:11
  • Yeah, bookmarks wasn't the answer for us either. We have since moved away from using AWS glue though so unfortunately its as far as I got. – krchun Aug 06 '19 at 21:00
7

Please check this answer. There is explanation and code sample how to upsert data into Redshift using staging table. The same approach can be used to run any SQL queries before or after Glue writes data using preactions and postactions options:

// Write data to staging table in Redshift
glueContext.getJDBCSink(
  catalogConnection = "redshift-glue-connections-test",
  options = JsonOptions(Map(
    "database" -> "conndb",
    "dbtable" -> staging,
    "overwrite" -> "true",
    "preactions" -> "<another SQL queries>",
    "postactions" -> "<some SQL queries>"
  )),
  redshiftTmpDir = tempDir,
  transformationContext = "redshift-output"
).writeDynamicFrame(datasetDf)
Yuriy Bondaruk
  • 4,512
  • 2
  • 33
  • 49
5

Today I have tested and got a workaround to update/delete from the target table using JDBC connection.

I have used as below

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job

import pg8000
args = getResolvedOptions(sys.argv, [
    'JOB_NAME',
    'PW',
    'HOST',
    'USER',
    'DB'
])
# ...
# Create Spark & Glue context

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# ...
config_port = ****
conn = pg8000.connect(
    database=args['DB'], 
    user=args['USER'], 
    password=args['PW'],
    host=args['HOST'],
    port=config_port
)
query = "UPDATE table .....;"

cur = conn.cursor()
cur.execute(query)
conn.commit()
cur.close()



query1 = "DELETE  AAA FROM  AAA A, BBB B WHERE  A.id = B.id"

cur1 = conn.cursor()
cur1.execute(query1)
conn.commit()
cur1.close()
conn.close()
BigData-Guru
  • 1,161
  • 1
  • 15
  • 20
  • I will test this out, have you tried psycopg2 instead of pg8000 ? – Kunal Oct 30 '18 at 08:48
  • Yes. psycopg2 is not yet supported as written in C. libraries such as pandas are not supported at the present time, nor are extensions written in other languages.. – BigData-Guru Oct 31 '18 at 07:41
  • For pg8000 to work in Glue did you have to make any include any external libraries? – Kunal Oct 31 '18 at 17:20
0

Job bookmarking option in Glue should do the trick , as suggested above . I have been using it successfully when my source is S3. http://docs.aws.amazon.com/glue/latest/dg/monitor-continuations.html

0

As per my testing (with the same scenario), the BOOKMARK functionality is not working. Duplicate data is getting inserted when the Job is run multiple times. I have got this issue resolved by removing the files from the S3 location daily (through lambda) and implementing Staging & Target tables. data will get insert/update based on the matching key columns.

BigData-Guru
  • 1,161
  • 1
  • 15
  • 20