My question is almost the same as this one: Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically
However solution from @Michael Heil didn't for my code.
Another similar question is: How can I update a broadcast variable in spark streaming?
In this question @Aastha proposed another solution, which is also not work.
I am building an PySpark structured streaming data analysis application. What I need to do is load stream data (stream_df
) from kafka and filter it by joining another DataFrame, say static_df
. And the static_df
is updated periodically.
At the very first, I think I could simply update the static_df
since it's looks like a common case. However, after I tried every solutions, I found that any update for static_df
will not appear to have no impact on my streaming process (Of course the static_df
is a global variable).
Below is my code (I use to replace some unimportant vars)
- Pulling & parse data from a kafka topic
spark = SparkSession \
.builder \
.getOrCreate()
stream_df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "<kafka_servers>") \
.option("subscribe", "<kafka_topic>") \
.load()
# decode & parse data
# where `domain` contains valid or invalid domains
# and we need to get valid domains with inner join function
stream_df = stream_df \
# ...decode & parse kafka messages \
.select("domain", "msg")
- Create
static_df
from API calling
df_api_url = spark \
.createDataFrame(
["<api_url>",], ["url",]
) \
.cache()
# A udf return `StructType()` results
udf = F.udf(lambda url: requests.get(url).json(), <schema>)
def get_static_df():
# column `domain` is a array contains lots of valid domains
ret = df_api_url \
.select(udf("url").alias("result") \
.select("result.domain")
return ret
static_df = get_static_df().cache()
- Filtering
stream_df
by joining withstatic_df
and do some analysis
stream_df = stream_df \
.join(F.broadcast(static_df), "domain", "inner")
# do some analysis here
# output results to console for debugging
query1 = stream_df \
.writeStream \
.format("console") \
.outputMode("append") \
.option("truncate", "false") \
.start()
Everything goes fine until now. Now we need to update static_df periodically since the valid domains is changing from time to time.
- Update
static_df
every hour
def update_static_df(batch_df, batch_id):
if batch_id == 0:
return
static_df.unpersist()
static_df = get_static_df().cache()
# Here we use a rate stream to periodically run the func `update_static_df`
query2 = spark \
.readStream \
.option("rate") \
.option("rowsPerSecond", 1) \
.option("numPartitions", num_partitions) \
.load()
.writeStream \
.trigger(processingTime="3600 seconds") \
.foreachBatch(update_static_df) \
.start()
Code in step 4 is just on of my attempts, I also tried construct a BroadcastWarpper and update broadcast variable periodically by setting a TTL time.
When the valid domain changes, output from query1 keeps unchange and output those domains in static_df
before it updates.
I guess, once a structured streaming query starts, a DAG will be created and each static variables (include cached DataFrame) will "frozen" by Spark. And any change make to those static variables will have on impact on the structured streaming process.
The only solution I found is to run join function in foreachBatch
:
def join_and_print(batch_df, batch_id):
batch_df = batch_df \
.join(F.broadcast(static_df), "domain", "inner")
print(str(batch_df.collect()))
# stream_df from step 1
query3 = stream_df \
.writeStream \
.outputMode("append") \
.foreachBatch(join_and_print) \
.start()
But this solution is not suitable for every situations, such as a groupby with time window after join.
So, is there any advise for my problem?