I have the following sample code. The actual dataframe size is about 1.1 million rows & approximately 100MB size. The code when run on AWS Glue with 10 / 30 DPUs, takes about 1.15 minutes, which I believe is too much a time.
import mysql.connector
df2= sqlContext.createDataFrame([("xxx1","81A01","TERR NAME 55","NY"),("xxx2","81A01","TERR NAME 55","NY"),("x103","81A01","TERR NAME 01","NJ")], ["zip_code","territory_code","territory_name","state"])
upsert_list = df2.collect()
db = mysql.connector.connect(host="xxxxxxxx.us-east-1.rds.amazonaws.com", user="user1", password="userpasswd", database="ziplist")
for r in upsert_list:
cursor = db.cursor()
insertQry = "INSERT INTO TEMP1 (zip_code, territory_code, territory_name, state) VALUES(%s, %s, %s, %s);"
n = cursor.execute(insertQry, (r.zip_code, r.territory_code, r.territory_name, r.state))
#print (" CURSOR status :", n)
db.commit()
db.close()
FOR UPSERTS:
cursor = db.cursor()
insertQry = "INSERT INTO TEMP2 (zip_code, territory_code, territory_name, state, city) SELECT tmp.zip_code, tmp.territory_code, tmp.territory_name, tmp.state, tmp.city from TEMP1 tmp ON DUPLICATE KEY UPDATE territory_name = tmp.territory_name, state = tmp.state, city = tmp.city;"
n = cursor.execute(insertQry)
print (" CURSOR status :", n)
db.commit()
db.close()
I found that the issue is caused due to the 2nd line, df2.collect() which is taking the entire dataframe to the driver and run it on a single node. I am trying to do / implement parallelism so that the database operation happens in multiple nodes, is there any way I can do this.
How to implement parallelism for RDS upserts, please share some sample code or pseudo code.
Thanks