0

I can connect to Redshift with psycopg2 by:

import psycopg2
conn = psycopg2.connect(host=__credential__.host_redshift, 
                        dbname=__credential__.dbname_redshift,
                        user=__credential__.user_redshift, 
                        password=__credential__.password_redshift,
                        port=__credential__.port_redshift)
cur = conn.cursor()

Also, I can update the existed table in the database with:

cur.execute("""
    UPDATE tb
    SET col2='updated_target_row'
    WHERE col1='target_row';
""")
conn.commit()

Now, I'd like to update the table in Redshift with Rows from Spark DataFrame. I looked up and found a pretty recent question about it (which, I'd like to justify for, is not duplicated with another question at all).

The solution seems pretty straightforward. However, I cannot even pass the Row object to a method involved the cursor.

What I am trying now:

def update_info(row):
    cur.execute("""
        UPDATE tb
        SET col2='updated_target_row'
        WHERE col1='target_row';
    """)

df.rdd.foreach(update_info)
conn.commit()

And I got error:

TypeError: can't pickle psycopg2.extensions.cursor objects

Interestingly, this doesn't seem to be a common issue. Any help is appreciated.

P.S.:

  1. Versions:

    python=3.6
    pyspark=2.2.0
    psycopg2=2.7.4
    
  2. Full error msg can be found in pastebin.

  3. I have tried rdd.map instead of rdd.foreach and got no luck.

TritonX114
  • 15
  • 6

1 Answers1

1

Connection objects and cursors are not serializable and cannot be send to the workers. You should use foreachPartition:

def update_info(rows):
    conn = psycopg2.connect(...)
    cur = conn.cursor()

    for row in rows:
        cur.execute(...)

df.rdd.foreachPartition(update_info)
Alper t. Turker
  • 34,230
  • 9
  • 83
  • 115
  • 1
    Thanks so much! Could you remind me how I could import psycopg2 to each partition? I got "Module not found" error trying your solution (or `import psycopg2` in the method). I have psycopg2 pkg installed on all my worker nodes. – TritonX114 May 07 '18 at 19:54
  • Global import should be enough. You can import inside function, but it shouldn't be necessary. If you get module not found errors, please double check if workers are configured to use expect environment / `PYTHONPATH`. – Alper t. Turker May 07 '18 at 20:13