5

I am running a code in python 3.7, the library is installed by

pip3 install kafka

here is the code

import random
import time, calendar
from random import randint
from kafka import KafkaProducer
from kafka import errors
from json import dumps
from time import sleep


def write_data(producer):
    data_cnt = 20000
    order_id = calendar.timegm(time.gmtime())
    max_price = 100000
    topic = "payment_msg"

    for i in range(data_cnt):
        ts = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime())
        rd = random.random()
        order_id += 1
        pay_amount = max_price * rd
        pay_platform = 0 if random.random() < 0.9 else 1
        province_id = randint(0, 6)
        cur_data = {"createTime": ts, "orderId": order_id, "payAmount": pay_amount, "payPlatform": pay_platform, "provinceId": province_id}
        producer.send(topic, value=cur_data)
        sleep(0.5)

def create_producer():
    print("Connecting to Kafka brokers")
    for i in range(0, 6):
        try:
            producer = KafkaProducer(bootstrap_servers=['kafka:9092'],
                            value_serializer=lambda x: dumps(x).encode('utf-8'))
            print("Connected to Kafka")
            return producer
        except errors.NoBrokersAvailable:
            print("Waiting for brokers to become available")
            sleep(10)

    raise RuntimeError("Failed to connect to brokers within 60 seconds")

if __name__ == '__main__':
    producer = create_producer()
    write_data(producer)

however an error is showing up

Traceback (most recent call last):
  File "generate_source_data.py", line 22, in <module>
    from kafka import KafkaProducer
  File "/usr/local/lib/python3.7/site-packages/kafka/__init__.py", line 23, in <module>
    from kafka.producer import KafkaProducer
  File "/usr/local/lib/python3.7/site-packages/kafka/producer/__init__.py", line 4, in <module>
    from .simple import SimpleProducer
  File "/usr/local/lib/python3.7/site-packages/kafka/producer/simple.py", line 54
    return '<SimpleProducer batch=%s>' % self.async

what is the problem?

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
user824624
  • 7,077
  • 27
  • 106
  • 183

1 Answers1

8

I found the answer from here https://github.com/dpkp/kafka-python/issues/1566.

pip3 uninstall kafka to remove the kafka package and install it by pip3 install kafka-python

user824624
  • 7,077
  • 27
  • 106
  • 183