0

I have MQTT consumer in PySpark, but when I send the data to MQTT topic, the PySpark code fails.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Test") \
    .getOrCreate()

lines = (spark
             .readStream
             .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
             .option("topic","my_topic")
             .load("tcp://127.0.0.1:1883"))

query = lines.writeStream.format("console").start()

lines.printSchema()

query.awaitTermination()

I send messages to MQTT without Spark.

import sys
import paho.mqtt.client as mqtt
import logging


def on_connect(client, userdata, flags, rc):
    print('connected (%s)' % client._client_id)


def on_message(client, userdata, message):
    print('------------------------------')
    print('topic: %s' % message.topic)
    print('payload: %s' % message.payload)
    print('qos: %d' % message.qos)


def main(argv):
    broker = "127.0.0.1"
    port = 1883
    mqttc = mqtt.Client("Test1")
    print("connecting to broker ", broker)
    mqttc.connect(broker, port, 60)
    mqttc.on_connect = on_connect
    mqttc.on_message = on_message

    mqttc.subscribe("my_topic")

    #mqttc.loop_start()

    print("publishing ")
    mqttc.publish("my_topic","{\"messageeeee\"}")

    mqttc.loop_forever()


if __name__ == '__main__':
    main(sys.argv[1:])

This is the error message that stops the PySpark Streaming job, each time when I generate a new message in MQTT:

Logical Plan:
org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@72a6d759
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:295)
    at org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:189)
Caused by: java.lang.AssertionError: assertion failed: DataFrame returned by getBatch from org.apache.bahir.sql.streaming.mqtt.MQTTTextStreamSource@72a6d759 did not have isStreaming=true
Project [_1#10 AS value#13, _2#11 AS timestamp#14]
+- AnalysisBarrier
      +- LocalRelation [_1#10, _2#11]

    at scala.Predef$.assert(Predef.scala:170)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:395)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1$$anonfun$apply$9.apply(MicroBatchExecution.scala:390)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
    at org.apache.spark.sql.execution.streaming.StreamProgress.foreach(StreamProgress.scala:25)
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
    at org.apache.spark.sql.execution.streaming.StreamProgress.flatMap(StreamProgress.scala:25)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch$1.apply(MicroBatchExecution.scala:390)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.org$apache$spark$sql$execution$streaming$MicroBatchExecution$$runBatch(MicroBatchExecution.scala:389)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply$mcV$sp(MicroBatchExecution.scala:133)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1$$anonfun$apply$mcZ$sp$1.apply(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:271)
    at org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:58)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution$$anonfun$runActivatedStream$1.apply$mcZ$sp(MicroBatchExecution.scala:121)
    at org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:56)
    at org.apache.spark.sql.execution.streaming.MicroBatchExecution.runActivatedStream(MicroBatchExecution.scala:117)
    at org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runStream(StreamExecution.scala:279)
    ... 1 more


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/mimi/test/InitiatorSpark.py", line 41, in <module>
    query.awaitTermination()

I don't know what is the problem with query.awaitTermination. Is the format of a message that I send in MQTT incorrect or what exactly happens here?

zero323
  • 322,348
  • 103
  • 959
  • 935
ScalaBoy
  • 3,254
  • 13
  • 46
  • 84
  • Did you find the solution to this? I am also facing this issue – Rakesh Rakshit Jul 31 '18 at 06:10
  • What is your pyspark version? – Rakesh Rakshit Jul 31 '18 at 06:13
  • @RakeshRakshit: yes, I partly solved the issue. Take a look: https://stackoverflow.com/questions/51539424/how-to-get-dataframe-in-structured-streaming – ScalaBoy Jul 31 '18 at 13:12
  • @RakeshRakshit: but the problem is that I still cannot parse Json strings received in MQTT queue. I get null values. It's explained in the thread that I've just shared above. I am using PySpark 2.2.1. I submit jobs the way: `~/Downloads/spark-2.2.1-bin-hadoop2.7/bin/spark-submit --jars lib/spark-streaming-mqtt_2.11-2.2.1.jar,lib/spark-sql-streaming-mqtt_2.11-2.2.1.jar,lib/org.eclipse.paho.client.mqttv3-1.2.0.jar InitiatorSpark.py` – ScalaBoy Jul 31 '18 at 13:13

0 Answers0