I wanted to parse a array column of type: ["key: value key1: value1 key2: value2 .. key_35: value_35\n", ""] and then take these key columns and put that as column names along with other columns and value as the corresponding key columns' values
I am reading from silver layer of delta lake and writing back to again silver layer of delta lake
This is the code I am using but I am getting error like Queries with streaming sources must be executed with writeStream.start(); tahoe
dfInput = (spark.readStream
.option("maxFilesPerTrigger", 1)
.format("delta")
.load(path)
.withColumn("status", col("status").getItem(0))
.select(col("date"), col("id"), col("status"))
.withColumn("status", trim(col("status")))
.withColumn("status", split(col("status"), " "))
)
(dfInput.writeStream
.trigger(processingTime='1 seconds')
.option("checkpointLocation", "checkpointPath")
.foreachBatch(forEachFunc)
.start()
)
def forEachFunc(split_df,batch_id):
dfParsed = (dfInput
.withColumn("kv", explode(col("status")))
.withColumn("key", concat(lit("status_"), split(col("kv"), ":")[0]))
.withColumn("val", split(col("kv"), ":")[1]))
(dfParsed.groupBy("date", "id")
.pivot("key")
.agg(first("val"))
.write.format("delta")
.mode("append")
.save(destPath))
pass
I tried pivoting inside foreachBatch function on seeing stack over flow solution (Pivot a streaming dataframe pyspark) and also this link (https://www.mssqltips.com/sqlservertip/6563/pivot-transformations-for-spark-streaming/#:~:text=Spark%20streaming%20supports%20wide%20range,am%20going%20to%20describe%20here.) but I am still getting error - Queries with streaming sources must be executed with writeStream.start(); tahoe