What am I doing?
I want to build an API service using Flask to extract data from one database, do some data analysis and then load the new data into a separate DB.
What is wrong?
If I run Spark by itself, I can access the db, perform analysis and load to db. But the same functions would not work when using them in a Flask application(api routes).
How am I doing it?
First I start the Spark master and worker. I can see I have one worker in localhost:8080
under the master.
export JAVA_HOME=$(/usr/libexec/java_home -v 1.8)
../sbin/start-master.sh
../sbin/start-slave.sh spark://xxx.local:7077
For the Flask application:
app = Flask(__name__)
spark = SparkSession\
.builder\
.appName("Flark - Flask on Spark")\
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
@app.route("/")
def hello():
dataframe = spark.read.format("jdbc").options(
url="jdbc:postgresql://localhost/foodnome_dev?user=postgres&password=''",
database="foodnome_test",
dbtable='"Dishes"'
).load()
print([row["description"]
for row in dataframe.select('description').collect()])
return "hello"
To run this application, I use JDBC driver with spark-submit
:
../bin/spark-submit --master spark://Leos-MacBook-Pro.local:7077 --driver-class-path postgresql-42.2.5.jar server.py
What error do I get?
On Flask side, the error is Internal Server Error. On Spark side,
File "/Users/leoqiu/Desktop/spark-2.4.0-bin-hadoop2.7/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o36.collectToPython.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.0.0.67, executor 0): java.lang.ClassNotFoundException: org.postgresql.Driver
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
at org.apache.spark.sql.execution.datasources.jdbc.DriverRegistry$.register(DriverRegistry.scala:45)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:55)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:54)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD.compute(JDBCRDD.scala:272)