I'm trying to determine the schema of a json kafka topic. To achieve that, I lifted a code part from this blog(https://medium.com/wehkamp-techblog/streaming-kafka-topic-to-delta-table-s3-with-spark-structured-streaming-2bb3027c7565).
Background : I have a topic streaming json messages which are base64 encoded. So I'm able to decode the value using unbase64(value)
casting it to string which gives me the required json in the form of a string. Now because I have to flatten this string json without any information on the schema, I'm using the below function.
`
def infer_topic_schema_json(topic):
df_json = (spark.read
.format("kafka")
.option("kafka.bootstrap.servers", kafka_broker)
.option("subscribe", topic)
.option("startingOffsets", "earliest")
.option("endingOffsets", "latest")
.option("failOnDataLoss", "false")
.load()
# filter out empty values
.withColumn("value", expr("string(value)"))
.filter(col("value").isNotNull())
# get latest version of each record
.select("key", expr("struct(offset, value) r"))
.groupBy("key").agg(expr("max(r) r"))
.select("r.value"))
# decode the json values
df_read = spark.read.json(
df_json.rdd.map(lambda x: x.value), multiLine=True)
# drop corrupt records
if "_corrupt_record" in df_read.columns:
df_read = (df_read
.filter(col("_corrupt_record").isNotNull())
.drop("_corrupt_record"))
return df_read.schema.json()
However, this code piece in databricks is taking too much time (as long as 3 hours) for a topic which has less than 1000 messages so far. Config of the cluster I'm using is below :
2-10 workers 256-1280 GB Memory
32-160 cores
Driver - 128 GB memory, 16 cores
Runtime - 10.4.x-scala2.12
Standard E16ds_v4 12-44 DBU/h
Any suggestions on how to speed up the above process as it is not feasible to let the schema inferring function for 3+ hours as I already know that schema would be updated dynamically quite frequently (once a day or 2).
I already tried reading a small batch of 2 records to see if that speeds up the process but it did not. Also, the plan is to maintain a checkpoint in the above batch function so that I do not have to process all the already processed messages when I'm rerunning to update schema.