I have a stream of alerts coming from Kafka to Spark. These are alerts in JSON format from different IoT Sensors.
Kafka Streams:
{ "id":"2093021", alert:"Malfunction detected","sensor_id":"14-23092-AS" }
{ "id":"2093021", alert:"Malfunction detected","sensor_id":"14-23092-AS" , "alarm_code": "Severe" }
My code: spark-client.py
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
import json
if __name__ == "__main__":
spark = SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").config("spark.mongodb.output.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").getOrCreate()
sc = spark.sparkContext
ssc = StreamingContext(sc, 10)
zkQuorum, topic = sys.argv[1:]
kafka_streams =KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-sql-mongodb-test-consumer", {topic: 1})
dstream = kafka_streams.map(lambda x: json.loads(x[1]))
dstream.pprint()
ssc.start()
ssc.awaitTermination()
When I run this
ubuntu@ip-172-31-89-176:~/spark-connectors$ spark-submit spark-client.py localhost:2181 DetectionEntry
I get this output
-------------------------------------------
Time: 2019-12-04 14:26:40
-------------------------------------------
{u'sensor_id': u'16-23092-AS', u'id': u'2093021', u'alert': u'Malfunction detected'}
I need to be able to save this alert to a remote MongoDB. I have two specific challenges:
How do I correctly parse the output so that I can create a dataframe that can be written to mongodb ? I have tried adding this to the end of code
d = [dstream] df = spark.createDataFrame(d).collect()
and it gives me this error
dataType py4j.java_gateway.JavaMember object at 0x7f5912726750 should be an instance of class 'pyspark.sql.types.DataType'
- My alerts can have different json structure and I'll need to dump them into a mongodb collection. As such a fixed schema wont work for me. Most of the similar questions and code that I have referred to in stackoverflow are specific to fixed schema and I'm unable to figure out how to push this to mongodb in a way that each record in the mongodb collection will have its own schema(json structure). Any pointers in the right direction is requested.