0

At one point using EMR 5.2.1 (which I have stuck with for over a year and a half now) I was able to write to a postgres db from within Spark using:

    try:
        df = self._spark.createDataFrame([Row(id=str(self.uuid),
                                              datetime=datetime.now(),
                                              metadata=json.dumps(self._metadata))])

        df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})
        return self
    except AttributeError as e:
        logging.error(e)
    except ReferenceError as e:
        logging.error(e)
    except ValueError as e:
        logging.error(e)
    except Py4JJavaError as e:
        logging.error(e)
    except IllegalArgumentException as e:
        logging.error(e)
    return None

This is not working on EMR 5.12.0 and I can't figure out what the problem is. I have looked at this summary of JDBC/PySpark but don't see any obvious answer there:

How to use JDBC source to write and read data in (Py)Spark?

Here is the configuration I'm submitting to EMR:

    Configurations=[
        {
            "Classification": "spark",
            "Properties": {
                "maximizeResourceAllocation": "true"
            },
            "Configurations": []},
        {
            "Classification": "spark-defaults",
            "Properties": {
                "spark.rpc.message.maxSize": "768",
                "spark.driver.maxResultSize": "4500m",
                "spark.jars": "/home/hadoop/postgresql-9.4.1210.jre7.jar",
                "spark.executor.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
                "spark.driver.extraClassPath": "/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/emr/emrfs/conf:/usr/share/aws/emr/emrfs/lib/*:/usr/share/aws/emr/emrfs/auxlib/*:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/home/hadoop/postgresql-9.4.1210.jre7.jar",
                "spark.driver.userClassPathFirst": "true",
                "spark.executor.userClassPathFirst": "true"
            },
            "Configurations": []
        }
    ],

I've tried many different combinations of those arguments, ommitting one or more, but so far nothing has worked. Finally, if I run it locally I'm also not able to write to the db and I get the following error message:

2018-03-13 14:58:55,808 root ERROR An error occurred while calling o1319.jdbc.
: scala.MatchError: null
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:62)
    at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:472)
    at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:48)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
    at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116)
    at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:92)
    at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:92)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:610)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:233)
    at org.apache.spark.sql.DataFrameWriter.jdbc(DataFrameWriter.scala:461)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:280)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:745)

In this case I've tried to add the spark.driver.extraClassPath config:

self._spark = SparkSession\
    .builder\
    .appName(self._app_name)\
    .config("spark.driver.extraClassPath", "/path/to/postgresql-9.4-1201.jdbc4.jar")\
    .getOrCreate()
Evan Zamir
  • 8,059
  • 14
  • 56
  • 83
  • How is it failing? Can you include a stack trace? – ernest_k Mar 10 '18 at 10:22
  • @ErnestKiwele Do you mean the stderr log file that is produced? – Evan Zamir Mar 13 '18 at 20:43
  • Yes, the error information with the trace-back/stack-trace – ernest_k Mar 13 '18 at 20:44
  • @ErnestKiwele Ok, that's the thing. I've looked through it and there is actually no error being recorded. All I know is it is not writing to the db. Any ideas? Do you know of anything related to writing out to a db that has changed since 5.2.1? – Evan Zamir Mar 13 '18 at 20:46
  • @ErnestKiwele I have added some info about the configs that I'm submitting. Perhaps I am not putting values in the right section? – Evan Zamir Mar 13 '18 at 20:52

1 Answers1

0

I found the issue. It was the following line:

df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"})

The issue is that if there is an existing table it would generate an error. To fix this, add mode='append':

df.write.jdbc(url, table, properties={"driver": "org.postgresql.Driver"},mode='append')

A couple of interesting things to point out. I filed a bug because the functional version of specifying append causes the same bug:

df.write.mode('append').jdbc(url, table, properties={"driver": "org.postgresql.Driver"})

Furthermore, in previous versions of Spark on EMR, I had not been specifying append mode and yet it was still able to write to the database. I do not know when this behavior changed or perhaps more likely maybe there was a bug that was fixed at some point. At any rate, this was hard to track down, hope my answer helps someone coming across the same problem.

Evan Zamir
  • 8,059
  • 14
  • 56
  • 83