At one point using EMR 5.2.1 (which I have stuck with for over a year and a half now) I was able to write to a postgres db from within Spark using:
try:
df = self._spark.createDataFrame([Row(id=str(self.uuid),
datetime=datetime.now(),
metadata=json.dumps(self._metadata))])
df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
return self
except AttributeError as e:
logging.error(e)
except ReferenceError as e:
logging.error(e)
except ValueError as e:
logging.error(e)
except Py4JJavaError as e:
logging.error(e)
except IllegalArgumentException as e:
logging.error(e)
return None
This is not working on EMR 5.12.0 and I can't figure out what the problem is. I have looked at this summary of JDBC/PySpark but don't see any obvious answer there:
How to use JDBC source to write and read data in (Py)Spark?
Here is the configuration I'm submitting to EMR:
Configurations=[
{
"Classification": "spark",
"Properties": {
"maximizeResourceAllocation": "true"
},
"Configurations": []},
{
"Classification": "spark-defaults",
"Properties": {
"spark.rpc.message.maxSize": "768",
"spark.driver.maxResultSize": "4500m",
"spark.jars": "/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.executor.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.driver.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
"spark.driver.userClassPathFirst": "true",
"spark.executor.userClassPathFirst": "true"
},
"Configurations": []
}
],
I've tried many different combinations of those arguments, ommitting one or more, but so far nothing has worked. Finally, if I run it locally I'm also not able to write to the db and I get the following error message:
2018-03-13 14:58:55,808 root ERROR An error occurred while calling o1319.jdbc.
: scala.MatchError: null
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:280)
at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
at py4j.commands.CallCommand.execute(CallCommand.java:79)
at py4j.GatewayConnection.run(GatewayConnection.java:214)
at java.lang.Thread.run(Thread.java:745)
In this case I've tried to add the spark.driver.extraClassPath
config:
self._spark = SparkSession\
.builder\
.appName(self._app_name)\
.config("spark.driver.extraClassPath", "/path/to/postgresql-9.4-1201.jdbc4.jar")\
.getOrCreate()