2

I am new to kafka and pyspark and trying to write simple program , SO I have 2 files in kafka Topics in JSon format and I am reading this from pyspark streaming.

My Producer code is as follows:

  from kafka import *
import json
import time
import boto3
import json
from Consumer_Group import *
from json import loads
class producer :
            def json_serializer(data):
                    return json.dumps(data).encode("utf-8")

            def read_s3():
                p1 = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=producer.json_serializer)
                s3 = boto3.resource('s3')
                bucket = s3.Bucket('kakfa')
                for obj in bucket.objects.all():
                    key = obj.key
                    body = obj.get()['Body'].read().decode('utf-8')
                p1.send("Uber_Eats",body)
                p1.flush()

My Consumer code is as follows:

from pyspark.sql import SparkSession
from kafka import *
import time
class consumer:
                def read_from_topic(self,spark):
                        df = spark.readStream \
                            .format("kafka") \
                            .option("kafka.bootstrap.servers", "localhost:9092") \
                            .option("subscribe", "Uber_Eats") \
                             .option("startingOffsets", "earliest") \
                            .load()
                        df.createOrReplaceTempView("kafka")
                        spark.sql("select * from kafka")
                        print(df.isStreaming())
                                  


                def get_consumer(self):
                    consumer = KafkaConsumer("Uber_Eats", group_id='group1', bootstrap_servers=
                    "localhost:9092")
                    return  consumer

                def print_details(self,c1):
                    #    self.consumer=self.get_consumer(self)
                        # Read and print message from consumer
                     try:
                                for msg in c1:
                                    print(msg.topic, msg.value)
                                print("Done")
                     except Exception  as e:
                                print(e)

Main Class:

from Producer_Group import *
from Consumer_Group import *
from Spark_Connection import *
class client:
    def transfer(self):
        spark = connection.get_connection(self)
        producer.read_s3()
        c1 = consumer.get_consumer(spark)
        consumer.read_from_topic(self,spark)
      #  consumer.print_details(self,c1)

c=client()
c.transfer()

Sample Data in S3 that i am reading into kafka Topic :

{
    
        {
            "Customer Number": "1",
            "Customer Name": "Aditya",
            "Restaurant Number": "2201",
            "Restaurant NameOrdered": "Bawarchi",
            "Number of Items": "3",
            "price": "10",
            "Operating Start hours": "9:00",
            "Operating End hours": "23:00"
        },
        {
            "Customer Number": "2",
            "Customer Name": "Sarva",
            "Restaurant Number": "2202",
            "Restaurant NameOrdered": "Sarvana Bhavan",
            "Number of Items": "4",
            "price": "20",
            "Operating Start hours": "8:00",
            "Operating End hours": "20:00"
        },
        {
            "Customer Number": "3",
            "Customer Name": "Kala",
            "Restaurant Number": "2203",
            "Restaurant NameOrdered": "Taco Bell",
            "Number of Items": "5",
            "price": "30",
            "Operating Start hours": "11:00",
            "Operating End hours": "21:00"
        }
    
}

What Have I tried so far: : I have tried to print on console so as to check for condition and if it passes only then insert it into databse. to check the condtiion , I am reading data from "read_from_topic" function and creating a view (createOrReplaceTempView) to see data, but nothing is printing, can some one please guide me how to print and verify if My conditions or data is read correclty?

Thanks in Advance !!!!

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
asd
  • 165
  • 10
  • 1
    I think I've told you before, `KafkaConsumer` and `readStream.format("kafka")` are two completely separate libraries, and you shouldn't use the first if you want to use Spark, and you shouldn't use the second if the only goal is to consume from Kafka... So, can you please clarify your goal? Besides that, you've only shown a class definition that you've never called, so what piece of this code is supposed to print anything? – OneCricketeer May 08 '21 at 14:18
  • 1
    I have included "Main class" and sample data (JSON ). Please have a look So I am trying to read from kafka topic and for each customer there is "Operating Start hours", if it is greater than 8:00 only then I should insert into mysql databse else ignore, this is my requirmenet. – asd May 08 '21 at 16:31
  • My goal is to only consume from kafka topic and do transformations in spark streaming and based on that insert into databse – asd May 08 '21 at 16:33
  • I mean, Python's `KafkaConsumer` can also "do transformations" and you can combine that with other libraries to "write to databases", so still unclear why you think you'll need Spark for that? Also, I wouldn't even use Python for this since Kafka comes with Kafka Connect for exactly this purpose of writing to external systems https://rmoff.net/2021/03/12/kafka-connect-jdbc-sink-deep-dive-working-with-primary-keys/ Besides that, what happens when this runs? Because you should actually be getting an error around `consumer.read_from_topic(self,spark)` – OneCricketeer May 08 '21 at 22:54

1 Answers1

1

creating a view (createOrReplaceTempView) to see data, but nothing is printing

Because spark.sql returns a new Dataframe.

If you want to print it, then you'll need

spark.sql("select * from kafka").show()

However, this alone will be at least two byte array columns, not JSON strings, so you'll want to define a schema at some point to extract anything or CAST to at least have human readable data

Also worth pointing out that the data you've shown is not valid JSON, and boto3 isn't necessary since Spark can read files from S3 itself (and thus Kafka isn't strictly needed since you could just take S3 data directly into your final location, with a Spark persist() function in between)

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I am trying to use your .show method that you gave , but is giving me error. pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start(); – asd May 09 '21 at 03:20
  • Correct. You need to start() and awaitTermination. Here's an example from the Spark source code https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala#L83-L90 – OneCricketeer May 09 '21 at 03:24
  • Yes, I have tried that, it is giving me a different error, RROR StreamMetadata: Error writing stream metadata StreamMetadata(68220860-8bb2-4058-8799-64d5ef5fcc7d) to file:/C:/Users/komu0/AppData/Local/Temp/temporary-7975a168-d556-4ec3-abfb-f23c8c8508cd/metadata ExitCodeException exitCode=-1073741515: – asd May 09 '21 at 03:30
  • I don't know about this. I also don't develop on windows, but seems there's some solutions here https://stackoverflow.com/questions/45947375/why-does-starting-a-streaming-query-lead-to-exitcodeexception-exitcode-1073741 – OneCricketeer May 09 '21 at 03:33
  • sure , Thank you so much – asd May 09 '21 at 03:37