I am trying to sink results processed by Structured Streaming API in Spark to PostgreSQL. I tried the following approach (somehow simplified, but hope it's clear):
class Processor:
def __init__(self, args):
self.spark_session = SparkSession \
.builder \
.appName("processor") \
.config("spark.sql.shuffle.partitions", 4) \
.config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint") \
.config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
.config("spark.driver.extraClassPath", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
.config("spark.jars", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
.getOrCreate()
self.db_url = args.db_url
self.db_user = args.db_user
self.db_password = args.db_password
def get_data_to_publish(self):
return self.spark_session.readStream # processing of data goes here ...
def write_output_data(self, data):
return data \
.writeStream \
.outputMode("append") \
.foreachBatch(self.update_table) \
.start()
def update_table(self, data, batch_id):
data \
.write \
.jdbc(url=self.db_url, table="output_table", mode="overwrite", properties={"user": self.db_user, "password": self.db_password})
def process(self):
result = self.get_data_to_publish()
q = self.write_output_data(result)
q.awaitTermination()
def __name__ == "__main__":
args = # ... parsing args ...
processor = Processor(args)
processor.process()
I run this in the standalone mode with Spark engine deployed as Docker container). Unfortunatelly I get the following error (I left out the whole Java stack trace, it is quite long):
org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3) (192.168.0.2 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.ExpandExec.projection of type scala.Function1 in instance of org.apache.spark.sql.execution.ExpandExec
What am I doing wrong? Is there anything I could do to fix this error?