I am new to kafka and pyspark and trying to write simple program , SO I have 2 files in kafka Topics in JSon format and I am reading this from pyspark streaming.
My Producer code is as follows:
from kafka import *
import json
import time
import boto3
import json
from Consumer_Group import *
from json import loads
class producer :
def json_serializer(data):
return json.dumps(data).encode("utf-8")
def read_s3():
p1 = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=producer.json_serializer)
s3 = boto3.resource('s3')
bucket = s3.Bucket('kakfa')
for obj in bucket.objects.all():
key = obj.key
body = obj.get()['Body'].read().decode('utf-8')
p1.send("Uber_Eats",body)
p1.flush()
My Consumer code is as follows:
from pyspark.sql import SparkSession
from kafka import *
import time
class consumer:
def read_from_topic(self,spark):
df = spark.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "Uber_Eats") \
.option("startingOffsets", "earliest") \
.load()
df.createOrReplaceTempView("kafka")
spark.sql("select * from kafka")
print(df.isStreaming())
def get_consumer(self):
consumer = KafkaConsumer("Uber_Eats", group_id='group1', bootstrap_servers=
"localhost:9092")
return consumer
def print_details(self,c1):
# self.consumer=self.get_consumer(self)
# Read and print message from consumer
try:
for msg in c1:
print(msg.topic, msg.value)
print("Done")
except Exception as e:
print(e)
Main Class:
from Producer_Group import *
from Consumer_Group import *
from Spark_Connection import *
class client:
def transfer(self):
spark = connection.get_connection(self)
producer.read_s3()
c1 = consumer.get_consumer(spark)
consumer.read_from_topic(self,spark)
# consumer.print_details(self,c1)
c=client()
c.transfer()
Sample Data in S3 that i am reading into kafka Topic :
{
{
"Customer Number": "1",
"Customer Name": "Aditya",
"Restaurant Number": "2201",
"Restaurant NameOrdered": "Bawarchi",
"Number of Items": "3",
"price": "10",
"Operating Start hours": "9:00",
"Operating End hours": "23:00"
},
{
"Customer Number": "2",
"Customer Name": "Sarva",
"Restaurant Number": "2202",
"Restaurant NameOrdered": "Sarvana Bhavan",
"Number of Items": "4",
"price": "20",
"Operating Start hours": "8:00",
"Operating End hours": "20:00"
},
{
"Customer Number": "3",
"Customer Name": "Kala",
"Restaurant Number": "2203",
"Restaurant NameOrdered": "Taco Bell",
"Number of Items": "5",
"price": "30",
"Operating Start hours": "11:00",
"Operating End hours": "21:00"
}
}
What Have I tried so far: : I have tried to print on console so as to check for condition and if it passes only then insert it into databse. to check the condtiion , I am reading data from "read_from_topic" function and creating a view (createOrReplaceTempView) to see data, but nothing is printing, can some one please guide me how to print and verify if My conditions or data is read correclty?
Thanks in Advance !!!!