0

I have a dataframe in Databricks called customerDetails.

+--------------------+-----------+
|        customerName| customerId|
+--------------------+-----------+
|John Smith          |       0001|
|Jane Burns          |       0002|
|Frank Jones         |       0003|
+--------------------+-----------+

I would like to be able to copy this from Databricks to a table within Postgres.

I found this post which used psycopg2 to copy individual lines to Postgres, I am trying to copy each row from the dataframe to the postgres table?

import psycopg2

v1 = 'testing_name'
v2 = 'testing_id'


conn = psycopg2.connect(host="HOST_NAME",
                        port="PORT",
                        user="USER_NAME",
                        password="PASSWORD",
                        database="DATABASE_NAME")

cursor = conn.cursor()
cursor.execute("INSERT INTO customerTable (customerName, customerId) VALUES(%s, %s)", (v1, v2))
conn.commit()
cursor.close()
conn.close()
John Rotenstein
  • 241,921
  • 22
  • 380
  • 470
Data_101
  • 893
  • 7
  • 14
  • 25

1 Answers1

0

You can insert, row by row, all the data into your table.

See the documentation for cursor.executemany too as you can reorder your data as a list of tuples and pass the list as the last argument.

The code will be almost identical to the example you gave

cursor = conn.cursor()
def append_to_table(row):
    cursor.execute("INSERT INTO customerTable (customerName, customerId) VALUES(%s, %s)", (row.customerName, row.customerId))

df.rdd.map(append_to_table)
conn.commit()
cursor.close()
conn.close()
Steven Black
  • 1,988
  • 1
  • 15
  • 25
  • Thanks for the reply, I tried this and received an error: 'DataFrame' object has no attribute 'map' – Data_101 Apr 24 '18 at 16:04
  • oops, try using the `rdd` conversion – Steven Black Apr 24 '18 at 16:09
  • Thanks. It runs without an error now, only it it doesn't seem to actually copy anything to the table. – Data_101 Apr 24 '18 at 16:16
  • hmm, try printing out the `row.customerName` to make sure it's actually running through each row – Steven Black Apr 24 '18 at 16:17
  • it doesn't seem to be printing anything. – Data_101 Apr 24 '18 at 16:27
  • hmm, try using `df.foreach` instead of `map`, see [the docs](http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame.foreach) – Steven Black Apr 24 '18 at 16:28
  • Not sure this will work as you surely need a separate connection/cursor object on each partition? If that is the case, you could modify the code to create a connection per partition, write all the data in the partition and then kill the connection, using mapPartition. – ags29 Apr 25 '18 at 11:30
  • alternatively, you can try a built-in method like [this](https://stackoverflow.com/questions/38825836/write-spark-dataframe-to-postgres-database) – ags29 Apr 25 '18 at 11:30