0

I'm trying to insert and update some data on RDS MySql, I prefer to perform upsert in my pyspark & using a staging table. Could someone please guide me with the logic for staging table, update column data, and finally mergin with mysql.

Edited my question as suggested:

I know and have seen in other responses that currently it is not directly possible to have a row / record level to a database table. But I have got a new requirement from our client wherein we need to provide for an UPSERTs implementation in the current ETL which all has been done using pyspark.

Migrating the whole ETL is not possible / feasible and this is not a decision i can take currently, since all the ETL are in production. This is just an add-on feature to be provided to the existing code.

My database is in RDS Aurora, and would need to process the records which has a primary key defined in the table, and then perform a UPDATE or an INSERT. Insert, I think is not problem since we have the "Append" option, I need some suggestions for UPDATE of existing records.

Would appreciate if someone can share some ideas / pseudo codes for this.

Ok, please note, due to NDA agreement, I cannot provided exact code snippets/code, but this is what the situation like I have created using sample db, scripts, etc.

# Create a Glue context
glueContext = GlueContext(SparkContext.getOrCreate())

# Create a DynamicFrame using the 'persons_json' table
zipterr_dynamicframe = glueContext.create_dynamic_frame.from_catalog(database="terr_db", table_name="zip_terr_txt")

# Create a DynamicFrame using the 'persons_json' table
zipterr_new_dynamicframe = glueContext.create_dynamic_frame.from_catalog(database="terr_db", table_name="zip_terr_new_txt")

# Print out information about this data
print "Count:  ", zipterr_dynamicframe.count()
zipterr_dynamicframe.count()
5
zipterr_dynamicframe.count()
+--------+--------------+--------------+-----+------------------+-------------------+---------+----------+
|zip_code|territory_code|territory_name|state|business_unit_code|   field_force_name|is_active|is_deleted|
+--------+--------------+--------------+-----+------------------+-------------------+---------+----------+
|    xxx1|         81A01|  TERR NAME 01|   NJ|              BCBU|FIELD FORCE NAME 01|        Y|         N|
|    xxx2|         81A01|  TERR NAME 01|   NJ|              BCBU|FIELD FORCE NAME 02|        Y|         N|
|    xxx3|         81A01|  TERR NAME 01|   NJ|              BCBU|FIELD FORCE NAME 03|        Y|         N|
|    xxx4|         81A01|  TERR NAME 01|   CA|              BCBU|FIELD FORCE NAME 04|        Y|         N|
|    xxx5|         81A01|  TERR NAME 01|   ME|              BCBU|FIELD FORCE NAME 05|        Y|         N|
+--------------------------------------------------------------------------------------------------------+

# Print out information about this data
print "Count:  ", zipterr_new_dynamicframe.count()
zipterr_new_dynamicframe.count()
3
zipterr_new_dynamicframe.count()
+--------+--------------+--------------+-----+------------------+-------------------+---------+----------+
|zip_code|territory_code|territory_name|state|business_unit_code|   field_force_name|is_active|is_deleted|
+--------+--------------+--------------+-----+------------------+-------------------+---------+----------+
|    xxx1|         81A01|  TERR NAME 55|   NY|              BDGA|FIELD FORCE NAME 25|        Y|         N|
|    xxx2|         81A01|  TERR NAME 55|   NY|              BDGA|FIELD FORCE NAME 25|        Y|         N|
|    x103|         81A01|  TERR NAME 01|   NJ|              BCBU|FIELD FORCE NAME 03|        Y|         N|
+--------------------------------------------------------------------------------------------------------+

In the above display of zipterr_new_dynamicframe records, if you see there are 2 updates namely to xxx1 & xxx2, also there is a new record say, x103. Here, am looking for suggestions for upserts to Aurora RDS tables, with the above scenario. I saw some reference code here in here which is in Java, but would like to see if this approach could be done using pyspark.

Hope am clear.

Thanks

Yuva
  • 2,831
  • 7
  • 36
  • 60
  • Could you please edit and share with us what did you do so far about your MySQL scripts or just your table scripts? And I didnt understand if you need the MySQL upsert script or also the pyspark script. – Andrew Paes Jun 11 '18 at 01:32
  • Hi Andrew, updated my question with sample scenario, created using AWS Glue context. Is it possible to run/execute mysql upsert scripts from with Glue->pyspark environment. Thanks – Yuva Jun 11 '18 at 02:27
  • Before answer it; Did you see Stack showing this link as answer to your question? https://stackoverflow.com/questions/35634853/spark-sql-update-mysql-table-using-dataframes-and-jdbc – Andrew Paes Jun 11 '18 at 12:34
  • I did see that Andrew & I had mentioned in my question as well. There was a solution provided using Java. I was wondering if we can do this using pyspark, or if it is possible. I am yet to attempt the java code on a pyspark. Thanks – Yuva Jun 11 '18 at 20:36

0 Answers0