I need to read file from S3, load data from db ,compare data and update a table in postgresql . Below is myinput file
tconst|averagerating|numvotes|
+---------+-------------+--------+
|tt0000001| 5.6| 1609|
|tt0000002| 6.0| 197|
My Table has below columns
tconst|averagerating|numvotes|last_changed_date | last_updated_user
My job has to read data form S3 ,also load data from table and compare tconst value both from S3 file and DB , if tconst table matches with S3 file then update record in db else insert new record in db. i need to do below task , can you please let me know how to achieve this.
If tconst value in db and S3 file matches then update record ? how do i do update operation using Glue
Below is my code that i have written so far
glueContext = GlueContext(SparkContext.getOrCreate())
spark = glueContext.spark_session
db_name = "dataset"
tbl_dataset = "dataset_data_txt"
datasets = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_dataset)
url = "jdbc:postgresql://database-2.cpfn3akbkuxb.us-east-1.rds.amazonaws.com:5432/cloud"
db_properties = {
"driver": "org.postgresql.Driver",
"user": "postgres",
"password": "jhhk"
}
dbdf = spark.read.jdbc(url=url,table='textdata',properties=db_properties)
datasetsDF = datasets.toDF()
lookupDF = dbdf.toDF()
df1 = fundsDF.alias('df1')
df1.withColumn("last_chngd_date", current_timestamp())
df1.withColumn("last_chngd_user", "vinay")
df2 = lookupDF.alias('df2')
new_filtered=df1.join(df2, df1.tconst != df2.tconst ).select('df1.*') // records that willl be inserted
update_filtered=df1.join(df2, df1.tconst == df2.tconst ).select('df1.*')// recods that will be updated
newDynamicFarems = DynamicFrame.fromDF(new_filtered, glueContext, "test_nest")
updateDynamicFarems = DynamicFrame.fromDF(update_filtered, glueContext, "test_nest")
# Cycle through and write to Redshift.
datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = newDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink4")
datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = updateDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink5")
i have gone through link SPARK SQL - update MySql table using DataFrames and JDBC but it does not have solution for pyspark with Glue