0

My host configuaration is as follows:

Kafka, Spark, Mysql , are on docker

My code is as follows:

# To find out where the pyspark
import sys
from kafka import KafkaProducer,KafkaConsumer
import findspark
from boto3 import *
import boto3
import json

findspark.init()
# Creating Spark Context
from pyspark import SparkContext
from pyspark.sql import SparkSession
def get_connection(self):
     spark = SparkSession.builder.master("local[*]").appName("SparkByExamples.com").getOrCreate()  
     return spark   

def json_serializer(data):
     return json.dumps(data).encode("utf-8")
    

def read_s3():
    p1 = KafkaProducer(bootstrap_servers=['broker:29092'], value_serializer=json_serializer)
    
    s3 = boto3.resource('s3')
    bucket = s3.Bucket('kakfa')
    for obj in bucket.objects.all():
        key = obj.key
        body = obj.get()['Body'].read().decode('utf-8')
    p1.send("Uber_Eats",body)
    p1.flush()
def read_from_topic(self,spark):
    df = spark.readStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "broker:29092") \
        .option("subscribe", "Uber_Eats") \
         .option("startingOffsets", "earliest") \
        .load()
    df2 = df \
        .writeStream \
        .format("console") \
        .start()
    print(df2.awaitTermination()  )  
def get_consumer(self):
    consumer = KafkaConsumer("Uber_Eats", group_id='group1', bootstrap_servers=
    "broker:29092",value_deserializer=lambda x: json.loads(x.decode('utf-8')))
    return  consumer   
def print_details(self,c1):
    for msg in c1:
          print(msg.value)
    print("Dom  dfe")            
           
          
class Foo:
    def __init__(self):
                 
        spark = get_connection(self)
        read_s3()
        # System.setProperty("hadoop.home.dir", "$HADOOP_HOME\winutils-master\hadoop-2.x.x")
        c1 = get_consumer(spark)
        print_details(self,c1)


f = Foo()  

My output from above code is as folows:

{
    
        {
            "Customer Number": "1",
            "Customer Name": "Shyam",
            "Restaurant Number": "2201",
            "Restaurant NameOrdered": "Bawarchi",
            "Number of Items": "3",
            "price": "10",
            "Operating Start hours": "9:00",
            "Operating End hours": "23:00"
        },
        {
            "Customer Number": "2",
            "Customer Name": "Rohini",
            "Restaurant Number": "2202",
            "Restaurant NameOrdered": "Sarvana Bhavan",
            "Number of Items": "4",
            "price": "20",
            "Operating Start hours": "8:00",
            "Operating End hours": "20:00"
        },
        {
            "Customer Number": "3",
            "Customer Name": "Bhairav",
            "Restaurant Number": "2203",
            "Restaurant NameOrdered": "Taco Bell",
            "Number of Items": "5",
            "price": "30",
            "Operating Start hours": "11:00",
            "Operating End hours": "21:00"
        }
    
}

How do I read this to columns into mysql? i) Is it like a regular json file , read and insert?

ii)Or Do we have anything kakfa consumer 'json' format specific?

iii) I have specified value_deserializer=lambda x: json.loads(x.decode('utf-8')))

in code to get in json format is this necessary to load data into mysql

Thanks,

Adi

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
asd
  • 165
  • 10
  • 1
    In order to send data from kafka to another system or send data from another system to kafka you can use kafka-connectors – Felipe May 18 '21 at 05:57
  • Here is a link that might help you start: https://docs.confluent.io/cloud/current/connectors/cc-mysql-sink.html – Felipe May 18 '21 at 06:02

1 Answers1

0

Is it like a regular json file , read and insert?

Not sure what you mean by this. Mysql doesn't accept json files

Spark has its own JSON file reader, but you're reading from Kafka, so that's irrelevant

Do we have anything kakfa consumer 'json' format specific?

Yes. CAST(value as STRING) followed by various get_json_object calls. I've already linked you to this Databricks blog series

I have specified value_deserializer=lambda x: json.loads(x.decode('utf-8')))

This isn't Spark. I'm not sure why you still have this. Secondly, def get_consumer(self) doesn't accept or use the spark variable you've passed it, and you have no class definition there, so using self as the parameter named is discouraged (in other words, all your functions should be within class Foo, but you also don't really need classes at all)

Important detail - The file you've shown is not valid JSON, so none of these methods are going to immediately work, anyway

tl;dr - assuming you actually want to use Spark

  1. Use the function you've written that uses Spark consumer

  2. Replace

df \
  .writeStream \
  .format("console") \

With a JDBC writer, .writeStream.format("jdbc").save("jdbc:mysql//…") but only after your dataframe is modified to match the database schema


Otherwise, if you don't want Spark anymore, then JSON or Kafka is an implementation detail - download and configure a Mysql python client, then insert data like normal - being careful about transactions, rollbacks, error handling, prepared queries, etc


Or, as answered and commented several times, and more fault tolerant solution, forget Python and use Kafka Connect (scripts available in your Kafka bin directory, and requires no coding)

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • No I am not trying to use spark streaming, I am getting output through Kafka consumer and form there i am trying to enter into mysql – asd May 19 '21 at 01:17
  • You have also specififed this is not valid JSON format, can you please show how valid fomrat looks , I will change accordingly – asd May 19 '21 at 01:18
  • Then why are you calling `spark = get_connection(self)`? Or using Spark code at all? Put your data here, it'll tell you what's wrong (you're outer most brackets should be `[]` or you need keys for each value in that object) https://jsonformatter.curiousconcept.com/ Other than that (assuming your print details function actually works, but I don't think it does due to other syntax issues), like I said, Kafka really doesn't matter here. Make a simpler script that starts with a list of JSON strings and parse/write them to mysql; once you have that working well, replace that list with your consumer – OneCricketeer May 19 '21 at 12:02
  • Plus, since Kafka is an implementation detail, there's plenty of other questions showing exactly what you want - https://stackoverflow.com/questions/4251124/inserting-json-into-mysql-using-python or with pandas https://stackoverflow.com/questions/40450591/converting-json-to-sql-table – OneCricketeer May 19 '21 at 12:13