2

I am trying to sink results processed by Structured Streaming API in Spark to PostgreSQL. I tried the following approach (somehow simplified, but hope it's clear):

class Processor:

    def __init__(self, args):
        self.spark_session = SparkSession \
                             .builder \
                             .appName("processor") \
                             .config("spark.sql.shuffle.partitions", 4) \
                             .config("spark.sql.streaming.checkpointLocation", "/tmp/spark-checkpoint") \
                             .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.1") \
                             .config("spark.driver.extraClassPath", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
                             .config("spark.jars", os.path.join(os.path.split(os.path.abspath(__file__))[0], "postgresql-42.3.6.jar")) \
                             .getOrCreate()
        self.db_url = args.db_url
        self.db_user = args.db_user
        self.db_password = args.db_password

    def get_data_to_publish(self):
        return self.spark_session.readStream # processing of data goes here ...

    def write_output_data(self, data):
        return data \
                .writeStream \
                .outputMode("append") \
                .foreachBatch(self.update_table) \
                .start()

    def update_table(self, data, batch_id):
        data \
        .write \
        .jdbc(url=self.db_url, table="output_table", mode="overwrite", properties={"user": self.db_user, "password": self.db_password})
               
    def process(self):
        result = self.get_data_to_publish()

        q = self.write_output_data(result)
        q.awaitTermination()

def __name__ == "__main__":
    args = # ... parsing args ...
    processor = Processor(args)
    processor.process()

I run this in the standalone mode with Spark engine deployed as Docker container). Unfortunatelly I get the following error (I left out the whole Java stack trace, it is quite long):

org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 4 times, most recent failure: Lost task 0.3 in stage 1.0 (TID 3) (192.168.0.2 executor 0): java.lang.ClassCastException: cannot assign instance of java.lang.invoke.SerializedLambda to field org.apache.spark.sql.execution.ExpandExec.projection of type scala.Function1 in instance of org.apache.spark.sql.execution.ExpandExec

What am I doing wrong? Is there anything I could do to fix this error?

papi
  • 23
  • 4

0 Answers0