1

I am writing a Pyspark Structured Streaming application (version 3.0.1) and I'm trying to refresh a static dataframe from JDBC source periodically. I have followed the instructions in this post using a rate stream.
Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically

However, whenever the first unpersist (wether with or without blocking=True) occurs, the following persist is ignored, and from then on the dataframe is read in each trigger instead of being cached in storage.

this is how my code looks like:

# Read static dataframe
static_df = spark.read \
    .option("multiline", True) \
    .format("jdbc") \
    .option("url", JDBC_URL) \
    .option("user", USERNAME) \
    .option("password", PASSWORD) \
    .option("numPartitions", NUMPARTITIONS) \
    .option("query", QUERY) \
    .load()\
    .na.drop(subset=[MY_COL]) \
    .repartition(MY_COL) \
    .persist()

# Create rate stream
staticRefreshStream = spark.readStream.format("rate") \
    .option("rowsPerSecond", 1) \
    .option("numPartitions", 1) \
    .load()

def foreachBatchRefresher(batch_df, batch_id):
    global static_df
    print("Refreshing static table")
    static_df.unpersist()
    static_df = spark.read \
    .option("multiline", True) \
    .format("jdbc") \
    .option("url", JDBC_URL) \
    .option("user", USERNAME) \
    .option("password", PASSWORD) \
    .option("numPartitions", NUMPARTITIONS) \
    .option("query", QUERY) \
    .load()\
    .na.drop(subset=[MY_COL]) \
    .repartition(MY_COL) \
    .persist()

staticRefreshStream.writeStream \
    .format("console") \
    .outputMode("append") \
    .queryName("RefreshStaticDF") \
    .foreachBatch(foreachBatchRefresher) \
    .trigger(processingTime='1 minutes') \
    .start()

staticRefreshStream.awaitTermination()

The other parts including reading the streaming Dataframe, dataframe transformations and writing to a sink are omitted. Any idea what I'm missing?

Tamir Shalev
  • 161
  • 6

0 Answers0