2

What am I doing?

I want to build an API service using Flask to extract data from one database, do some data analysis and then load the new data into a separate DB.

What is wrong?

If I run Spark by itself, I can access the db, perform analysis and load to db. But the same functions would not work when using them in a Flask application(api routes).

How am I doing it?

First I start the Spark master and worker. I can see I have one worker in localhost:8080 under the master.

export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)

../sbin/start-master.sh
../sbin/start-slave.sh spark://xxx.local:7077

For the Flask application:

app = Flask(__name__)

spark = SparkSession\
    .builder\
    .appName("Flark - Flask on Spark")\
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")


@app.route("/")
def hello():
    dataframe = spark.read.format("jdbc").options(
        url="jdbc:postgresql://localhost/foodnome_dev?user=postgres&password=''",
        database="foodnome_test",
        dbtable='"Dishes"'
    ).load()

    print([row["description"]
           for row in dataframe.select('description').collect()])

    return "hello"

To run this application, I use JDBC driver with spark-submit:

../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar server.py

What error do I get?

On Flask side, the error is Internal Server Error. On Spark side,

File "/Users/leoqiu/Desktop/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
    format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o36.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.0.67, executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:55)
    at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
    at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)
leogoesger
  • 3,476
  • 5
  • 33
  • 71

2 Answers2

1

--driver-class-path is not sufficient here. The driver jar should added to the executor class path as well. This is typically handled together using:

  • spark.jars.packages / --packages
  • spark.jars / --jars

though you can still use spark.executor.extraClassPath.

Explanation:

With JDBC source driver is responsible for reading metadata (schema) and the executors for the actual data retrieve process.

This behavior is common to different external data sources, so whenever you use non-built-in format, you should distribute corresponding jars across the cluster.

See also

How to use JDBC source to write and read data in (Py)Spark?

0

Here is what worked for me, as suggested. It needs --jars

../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar --jars postgresql-42.2.5.jar server.py
leogoesger
  • 3,476
  • 5
  • 33
  • 71