1

I am trying to read files from S3 Bucket and write the dataframe to postgresql table using pyspark- but am encountering the following error

Code:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('sample_v2').getOrCreate()

path = ['s3a://path/sample_data.csv']
df = spark.read.csv(path, sep=',',inferSchema=True, header=True)

print(df.show()) #works until here, df has data

df.write.format("jdbc").option("driver","org.postgresql.Driver").option("url","jdbc:postgres://********************rds.amazonaws.com:5432;database=abc;user=abcde;password=abcdef").insertInto("test_result")

Error:

22/04/06 12:15:31 WARN HiveConf: HiveConf of name hive.stats.jdbc.timeout does not exist
22/04/06 12:15:31 WARN HiveConf: HiveConf of name hive.stats.retries.wait does not exist
22/04/06 12:15:34 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 2.3.0
22/04/06 12:15:34 WARN ObjectStore: setMetaStoreSchemaVersion called but recording version is disabled: version = 2.3.0, comment = Set by MetaStore UNKNOWN@192.168.29.14
22/04/06 12:15:34 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "C:\Softwares\spark-3.2.1-bin-hadoop3.2\spark\python\pyspark\sql\readwriter.py", line 762, in insertInto
    self._jwrite.insertInto(tableName)
  File "C:\Softwares\spark-3.2.1-bin-hadoop3.2\spark\python\lib\py4j-0.10.9.3-src.zip\py4j\java_gateway.py", line 1321, in __call__
  File "C:\Softwares\spark-3.2.1-bin-hadoop3.2\spark\python\pyspark\sql\utils.py", line 117, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Table not found: test_result;
'InsertIntoStatement 'UnresolvedRelation [test_result], [], false, false, false

How to resolve this?

JaySean
  • 125
  • 1
  • 15
  • Have you tried `saveAsTable` instead of `insertInto`? – pltc Apr 07 '22 at 04:43
  • Hi @pltc , it gives the following error when I gave `.saveAsTable("test_result")' - `pyspark.sql.utils.IllegalArgumentException: "Option 'dbtable' or 'query' is required.` – JaySean Apr 07 '22 at 05:18

2 Answers2

1

you should use jdbc:postgresql:// instead of using jdbc:postgres://.

The error says "pyspark.sql.utils.AnalysisException: Table not found: test_result;" but the issue could be with the connection establishment from spark to Postgres system.


df1=spark.read.format("jdbc").option("driver","org.postgresql.Driver").option("url","jdbc:postgresql://********************rds.amazonaws.com:5432;database=abc;user=abcde;password=abcdef").option("query", "select 1").load()
df1.show()

If the above statement gives the result then there is no connection issue and issue could be different like the user doesn't have access to the table.

I tried to use the above syntax and I was getting error org.postgresql.util.PSQLException: The server requested password-based authentication, but no password was provided. while establishing connection to postgress so I am using the below syntax to write to the Postgres

To read from the database

source_db_url = "jdbc:postgresql://xxxxxxxxxxxx.rds.amazonaws.com:5432/database"
db_driver="org.postgresql.Driver"
db_user = "admin"
db_password = "admin"
df1=spark.read.format("jdbc").option("driver", db_driver).option("url", source_db_url).option("query", "select 1").option("user", db_user).option("password", db_password).load()
df1.show()

To write to database

source_db_url = "jdbc:postgresql://xxxxxxxxxxxx.rds.amazonaws.com:5432/database"
db_driver="org.postgresql.Driver"
db_user = "admin"
db_password = "admin"
df1.write.format("jdbc").mode("Overwrite").option("truncate", "true").option("driver", db_driver).option("url", source_db_url).option("dbtable", "public.test_result").option("user", db_user).option("password", db_password).save()

if you are facing error py4j.protocol.Py4JJavaError: An error occurred while calling o98.save. : java.lang.ClassNotFoundException: org.postgresql.Driver

Download the PostgreSQL JDBC Driver from https://jdbc.postgresql.org/download.html Then replace the database configuration values by yours.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars", "/path_to_postgresDriver/postgresql-42.2.5.jar") \
    .getOrCreate()
yogesh garud
  • 336
  • 1
  • 8
  • Hi @yogesh garud, with the below read & write syntax- error is- `py4j.protocol.Py4JJavaError: An error occurred while calling o98.save. : java.lang.ClassNotFoundException: org.postgresql.Driver` – JaySean Apr 07 '22 at 05:39
  • @JaySean , can you please provide more information on how you are using spark , in EMR or standalone or databricks with version that you are using.as you might need to download the driver jar and submit it while running the code. I have updated answer. – yogesh garud Apr 07 '22 at 05:49
  • here is the link to same question where you can find multiple answers if the above one didn't work - https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql – yogesh garud Apr 07 '22 at 05:59
  • Hi @yogesh , I am running in EMR Scala version 2.11.12, OpenJDK 64-Bit Server VM, 1.8.0_322 and also locally. This is another warning I saw while running on EMR- `WARN HiveConf: HiveConf of name hive.server2.thrift.url does not exist` – JaySean Apr 07 '22 at 06:01
  • I tried https://stackoverflow.com/questions/34948296/using-pyspark-to-connect-to-postgresql, no luck – JaySean Apr 07 '22 at 06:02
  • WARN should not have any issue, its just the spark is not able to locate the jar that you are specifying. Try to have the jar in the directory as the code and provide absolute/ entire path from the root folder. – yogesh garud Apr 07 '22 at 06:14
  • Let us [continue this discussion in chat](https://chat.stackoverflow.com/rooms/243676/discussion-between-yogesh-garud-and-jaysean). – yogesh garud Apr 07 '22 at 07:58
-1

The entire game was of syntax- df.write.format("jdbc").option("driver", "org.postgresql.Driver").option("url","jdbc:postgresql://*************************ast-1.rds.a mazonaws.com/dbname").option("port","5432").option("dbtable","public.table_name").option("user","abc").option("password","abc").save()

JaySean
  • 125
  • 1
  • 15