I have a Spark job which process the data pretty fast, but when it tries to write the result into the postgresql database, it is quite slow. Here is most of the relevant code:
import psycopg2
def save_df_to_db(records):
# each item in record is a dictionary with 'url', 'tag', 'value' as keys
db_conn = psycopg2.connect(connect_string)
db_conn.autocommit = True
cur = db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
upsert_query = """INSERT INTO mytable (url, tag, value)
VALUES (%(url)s, %(tag)s, %(value)s) ON CONFLICT (url, tag) DO UPDATE SET value = %(value)s"""
try:
cursor.executemany(upsert_query, records)
except Exception as e:
print "Error in executing save_df_to_db: ", e.message
data = [...] # initial data
rdd = sc.parallelize(data)
rdd = ... # Some simple RDD transforms...
rdd.foreachPartition(save_df_to_db)
The table also has a constraint about url+tag being unique. I am looking for solutions to improve the speed of this code. Any suggestion or recommendation is welcome.