0

My question is almost the same as this one: Stream-Static Join: How to refresh (unpersist/persist) static Dataframe periodically

However solution from @Michael Heil didn't for my code.

Another similar question is: How can I update a broadcast variable in spark streaming?

In this question @Aastha proposed another solution, which is also not work.

I am building an PySpark structured streaming data analysis application. What I need to do is load stream data (stream_df) from kafka and filter it by joining another DataFrame, say static_df. And the static_df is updated periodically.

At the very first, I think I could simply update the static_df since it's looks like a common case. However, after I tried every solutions, I found that any update for static_df will not appear to have no impact on my streaming process (Of course the static_df is a global variable).

Below is my code (I use to replace some unimportant vars)

  1. Pulling & parse data from a kafka topic
spark = SparkSession \
    .builder \
    .getOrCreate()

stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "<kafka_servers>") \
    .option("subscribe", "<kafka_topic>") \
    .load()

# decode & parse data
# where `domain` contains valid or invalid domains
# and we need to get valid domains with inner join function
stream_df = stream_df \
    # ...decode & parse kafka messages \
    .select("domain", "msg")
  1. Create static_df from API calling
df_api_url = spark \
    .createDataFrame(
        ["<api_url>",], ["url",]
    ) \
    .cache()

# A udf return `StructType()` results
udf = F.udf(lambda url: requests.get(url).json(), <schema>)

def get_static_df():
    # column `domain` is a array contains lots of valid domains
    ret = df_api_url \
        .select(udf("url").alias("result") \
        .select("result.domain")
    return ret

static_df = get_static_df().cache()
  1. Filtering stream_df by joining with static_df and do some analysis
stream_df = stream_df \
    .join(F.broadcast(static_df), "domain", "inner")

# do some analysis here

# output results to console for debugging
query1 = stream_df \
    .writeStream \
    .format("console") \
    .outputMode("append") \
    .option("truncate", "false") \
    .start()

Everything goes fine until now. Now we need to update static_df periodically since the valid domains is changing from time to time.

  1. Update static_df every hour
def update_static_df(batch_df, batch_id):
    if batch_id == 0:
        return
    static_df.unpersist()
    static_df = get_static_df().cache()
   
# Here we use a rate stream to periodically run the func `update_static_df`
query2 = spark \
    .readStream \
    .option("rate") \
    .option("rowsPerSecond", 1) \
    .option("numPartitions", num_partitions) \
    .load()
    .writeStream \
    .trigger(processingTime="3600 seconds") \
    .foreachBatch(update_static_df) \
    .start()

Code in step 4 is just on of my attempts, I also tried construct a BroadcastWarpper and update broadcast variable periodically by setting a TTL time.

When the valid domain changes, output from query1 keeps unchange and output those domains in static_df before it updates.

I guess, once a structured streaming query starts, a DAG will be created and each static variables (include cached DataFrame) will "frozen" by Spark. And any change make to those static variables will have on impact on the structured streaming process.

The only solution I found is to run join function in foreachBatch:

def join_and_print(batch_df, batch_id):
    batch_df = batch_df \
        .join(F.broadcast(static_df), "domain", "inner")
    print(str(batch_df.collect()))

# stream_df from step 1
query3 = stream_df \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(join_and_print) \
    .start()

But this solution is not suitable for every situations, such as a groupby with time window after join.

So, is there any advise for my problem?

Viperl
  • 1
  • 2

0 Answers0