1

I have a streaming dataframe from kafka and I need to pivot two columns. This is the code I'm currently using:

streaming_df = streaming_df.groupBy('Id','Date')\
            .pivot('Var')\
            .agg(first('Val'))

query = streaming_df.limit(5) \
            .writeStream \
            .outputMode("append") \
            .format("memory") \
            .queryName("stream") \
            .start()

time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()

`

I recieve the following error: pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start()

pyspark version: 3.1.1

any ideas how to implement pivot with a streaming dataframe ?

Hanna
  • 13
  • 3
  • Does it [help](https://stackoverflow.com/questions/40609771/queries-with-streaming-sources-must-be-executed-with-writestream-start) you? – Kafels Jul 14 '21 at 14:26
  • unfortunately not. UPDATED code that appears after start(). @Kafels – Hanna Jul 14 '21 at 14:37
  • Your sql command should appear before the `start()` since it applies on streaming data. Or you have to `stop()` the streaming before running your sql command. Let me know if it helped you! – Dimitri Sifoua Jul 14 '21 at 18:47
  • Doesn't work, I tried both version. The problem is applying pivot on a streaming df, since the sql command worked on other tranformation (excluding pivot). @DimitriK.Sifoua – Hanna Jul 15 '21 at 06:33
  • I understand. In fact pivot transformation is not supported when when it's applying to streaming data. You have to use the `foreachBatch` with a user defined function which will apply the pivot transformation in batch mode. – Dimitri Sifoua Jul 15 '21 at 21:06

1 Answers1

0

The pivot transformation is not supported by Spark when applying to streaming data.

What you can do is to use the foreachBatch with a user defined function like this:

def apply_pivot(stream_df, batch_id):
    # Here your pivot transformation
    stream_df \
        .groupBy('Id','Date') \
        .pivot('Var') \
        .agg(first('Val')) \
        .write \
        .format('memory') \
        .outputMode('append') \
        .queryName("stream")

query = streaming_df.limit(5) \
    .writeStream \
    .foreachBatch(apply_pivot) \
    .start()

time.sleep(50)
spark.sql("select * from stream").show(20, False)
query.stop()

Let me know if it helped you!

Dimitri Sifoua
  • 490
  • 5
  • 17
  • Hi, thank you. A follow up question- my input is from a streaming kafka topic, and in the next stages I need to save my clean+aggregated+filtered data in a MSQL server. Assuming I apply PIVOT and any other aggregative function on a current microbatch using ForeachBatch, the changes are affected to previous batches from past runs? for example if I compute avg on a col, this avg is accumlated? – Hanna Jul 16 '21 at 15:26
  • Every aggregation function you execute inside foreachbatch will be apply only to current batch of course! – Dimitri Sifoua Jul 16 '21 at 16:33
  • so what about computations done inside Foreach? Are they reflected in the data store (MSQL server) and more importantly - how can I make sure that they are accumulated and computed according to every relevant record between different batches? – Hanna Jul 16 '21 at 16:50
  • Foreach applies computation to each row whereas ForeachBatch applies computation to group of rows. If you want to perform computations across the entire stream, you can use accumulator that coming from spark context. [link](https://spark.apache.org/docs/2.2.0/rdd-programming-guide.html#accumulators) – Dimitri Sifoua Jul 22 '21 at 04:34