I can connect to Redshift with psycopg2
by:
import psycopg2
conn = psycopg2.connect(host=__credential__.host_redshift,
dbname=__credential__.dbname_redshift,
user=__credential__.user_redshift,
password=__credential__.password_redshift,
port=__credential__.port_redshift)
cur = conn.cursor()
Also, I can update the existed table in the database with:
cur.execute("""
UPDATE tb
SET col2='updated_target_row'
WHERE col1='target_row';
""")
conn.commit()
Now, I'd like to update the table in Redshift with Rows
from Spark DataFrame
. I looked up and found a pretty recent question about it (which, I'd like to justify for, is not duplicated with another question at all).
The solution seems pretty straightforward. However, I cannot even pass the Row
object to a method involved the cursor.
What I am trying now:
def update_info(row):
cur.execute("""
UPDATE tb
SET col2='updated_target_row'
WHERE col1='target_row';
""")
df.rdd.foreach(update_info)
conn.commit()
And I got error:
TypeError: can't pickle psycopg2.extensions.cursor objects
Interestingly, this doesn't seem to be a common issue. Any help is appreciated.
P.S.:
Versions:
python=3.6 pyspark=2.2.0 psycopg2=2.7.4
Full error msg can be found in pastebin.
I have tried
rdd.map
instead ofrdd.foreach
and got no luck.