1

I need to read file from S3, load data from db ,compare data and update a table in postgresql . Below is myinput file

     tconst|averagerating|numvotes|
+---------+-------------+--------+
|tt0000001|          5.6|    1609|
|tt0000002|          6.0|     197|

My Table has below columns

   tconst|averagerating|numvotes|last_changed_date | last_updated_user

My job has to read data form S3 ,also load data from table and compare tconst value both from S3 file and DB , if tconst table matches with S3 file then update record in db else insert new record in db. i need to do below task , can you please let me know how to achieve this.

If tconst value in db and S3 file matches then update record ? how do i do update operation using Glue

Below is my code that i have written so far

     glueContext = GlueContext(SparkContext.getOrCreate())
    spark = glueContext.spark_session

    db_name = "dataset"
    tbl_dataset = "dataset_data_txt"
    datasets = glueContext.create_dynamic_frame.from_catalog(database=db_name, table_name=tbl_dataset)
    url = "jdbc:postgresql://database-2.cpfn3akbkuxb.us-east-1.rds.amazonaws.com:5432/cloud"
    db_properties = {
        "driver": "org.postgresql.Driver",
        "user": "postgres",
        "password": "jhhk"
    }
    dbdf = spark.read.jdbc(url=url,table='textdata',properties=db_properties)
    datasetsDF = datasets.toDF()
    lookupDF = dbdf.toDF()
    df1 = fundsDF.alias('df1')
df1.withColumn("last_chngd_date", current_timestamp())
df1.withColumn("last_chngd_user", "vinay")
    df2 = lookupDF.alias('df2')

    new_filtered=df1.join(df2, df1.tconst  != df2.tconst ).select('df1.*') // records that willl be inserted
    update_filtered=df1.join(df2, df1.tconst  == df2.tconst ).select('df1.*')// recods that will be updated
    newDynamicFarems = DynamicFrame.fromDF(new_filtered, glueContext, "test_nest")
    updateDynamicFarems = DynamicFrame.fromDF(update_filtered, glueContext, "test_nest")

    # Cycle through and write to Redshift.
    datasink4 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = newDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink4")
    datasink5 = glueContext.write_dynamic_frame.from_jdbc_conf(frame = updateDynamicFarems, catalog_connection = "postgresqldb", connection_options = {"dbtable": "dataset", "database": "cloud"}, transformation_ctx = "datasink5")

i have gone through link SPARK SQL - update MySql table using DataFrames and JDBC but it does not have solution for pyspark with Glue

sumedha
  • 473
  • 1
  • 9
  • 24

0 Answers0