0

I wanted to parse a array column of type: ["key: value key1: value1 key2: value2 .. key_35: value_35\n", ""] and then take these key columns and put that as column names along with other columns and value as the corresponding key columns' values

I am reading from silver layer of delta lake and writing back to again silver layer of delta lake

This is the code I am using but I am getting error like Queries with streaming sources must be executed with writeStream.start(); tahoe

dfInput = (spark.readStream
           .option("maxFilesPerTrigger", 1)
           .format("delta")
           .load(path)
           .withColumn("status", col("status").getItem(0))
           .select(col("date"), col("id"), col("status"))
           .withColumn("status", trim(col("status")))
           .withColumn("status", split(col("status"), "  "))
          )
      
(dfInput.writeStream
  .trigger(processingTime='1 seconds')
  .option("checkpointLocation", "checkpointPath")
  .foreachBatch(forEachFunc)
  .start()
)

def forEachFunc(split_df,batch_id):
    dfParsed = (dfInput
                .withColumn("kv", explode(col("status")))
                .withColumn("key", concat(lit("status_"), split(col("kv"), ":")[0]))
                .withColumn("val", split(col("kv"), ":")[1]))
    
    (dfParsed.groupBy("date", "id")
    .pivot("key")
    .agg(first("val"))
    .write.format("delta")
    .mode("append")
    .save(destPath))
    pass

I tried pivoting inside foreachBatch function on seeing stack over flow solution (Pivot a streaming dataframe pyspark) and also this link (https://www.mssqltips.com/sqlservertip/6563/pivot-transformations-for-spark-streaming/#:~:text=Spark%20streaming%20supports%20wide%20range,am%20going%20to%20describe%20here.) but I am still getting error - Queries with streaming sources must be executed with writeStream.start(); tahoe

rakk
  • 47
  • 7

0 Answers0