-1

I have a stream of alerts coming from Kafka to Spark. These are alerts in JSON format from different IoT Sensors.

Kafka Streams:

{ "id":"2093021", alert:"Malfunction detected","sensor_id":"14-23092-AS" }
{ "id":"2093021", alert:"Malfunction detected","sensor_id":"14-23092-AS" , "alarm_code": "Severe" }

My code: spark-client.py

from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql.context import SQLContext
import json

if __name__ == "__main__":
    spark = SparkSession.builder.appName("myApp").config("spark.mongodb.input.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").config("spark.mongodb.output.uri", "mongodb://spark:1234@172.31.9.44/at_cloudcentral.spark_test").getOrCreate()
    sc =  spark.sparkContext
    ssc = StreamingContext(sc, 10)   

zkQuorum, topic = sys.argv[1:]

kafka_streams =KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-sql-mongodb-test-consumer", {topic: 1})

dstream = kafka_streams.map(lambda x: json.loads(x[1]))
dstream.pprint()



ssc.start()
ssc.awaitTermination()

When I run this

ubuntu@ip-172-31-89-176:~/spark-connectors$ spark-submit spark-client.py localhost:2181 DetectionEntry

I get this output

-------------------------------------------
Time: 2019-12-04 14:26:40
-------------------------------------------
{u'sensor_id': u'16-23092-AS', u'id': u'2093021', u'alert': u'Malfunction detected'}

I need to be able to save this alert to a remote MongoDB. I have two specific challenges:

  1. How do I correctly parse the output so that I can create a dataframe that can be written to mongodb ? I have tried adding this to the end of code

    d = [dstream] df = spark.createDataFrame(d).collect()

and it gives me this error

dataType py4j.java_gateway.JavaMember object at 0x7f5912726750 should be an instance of class 'pyspark.sql.types.DataType'

  1. My alerts can have different json structure and I'll need to dump them into a mongodb collection. As such a fixed schema wont work for me. Most of the similar questions and code that I have referred to in stackoverflow are specific to fixed schema and I'm unable to figure out how to push this to mongodb in a way that each record in the mongodb collection will have its own schema(json structure). Any pointers in the right direction is requested.
Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
Donny
  • 678
  • 11
  • 34
  • So you have data in a Kafka topic, that you want to process, and stream to MongoDB? Are you set on using Spark? Because from the looks of it Kafka Connect to stream the data directly to Mongo would work and be easier. – Robin Moffatt Dec 04 '19 at 15:01
  • 1
    Since MongoDB does not require fixed schema, stream directly to it would work as @RobinMoffatt mentioned. Check the documentation: https://docs.mongodb.com/manual/core/data-modeling-introduction/#flexible-schema You can also check the Spark Streaming programming guide for Scala to have an idea about it: https://docs.mongodb.com/spark-connector/master/scala/streaming/ – Adam Dukkon Dec 04 '19 at 15:17
  • Saving the kafka stream directly to MongoDB would storing a lot of unwanted logs. I wish to store only the relevant logs that spark will process and covert. Rest can be ignored and or dumped into flat file storage. – Donny Dec 04 '19 at 15:33

1 Answers1

0

We can parsing the Kafka JSON message easily through Pyspark based Structured Streaming API with invoking the simple UDF. You can check complete code in below stack overflow link for reference. Pyspark Structured streaming processing