2

first I am doing baby steps in python and kafka, So let's say I have a listA=[item1, item2, item3] and every item of listA is a producer on a topic. Now what I want is to dynamically add/remove items to listA and became immediately producers also every item should run on it's own thread as they should be independent.

So basically I am trying to scale the application.

so far I tried to hard code every producer item and run it in its own terminal

each Item

from pykafka import KafkaClient
import json
from datetime import datetime
import uuid
import time


input_file = open('./data/item1.json')
json_array = json.load(input_file)
coordinates = json_array['features'][0]['geometry']['coordinates']

# Generate uuid


def generate_uuid():
    return uuid.uuid4()


# Kafaka producer
client = KafkaClient(hosts="localhost:9092")

topic = client.topics['test_kafka2']
producer = topic.get_sync_producer()


# Generate all coordinates
def generate_coordinates(coordinates):
    # new_coordinates = []
    i = 0
    while i < len(coordinates):
        data = {}
        data['class'] = 201
        data['key'] = str(data['class']) + '_' + str(generate_uuid())
        data['time_stamp'] = str(datetime.utcnow())
        data['longitude'] = coordinates[i][0]
        data['latitude'] = coordinates[i][1]
        message = json.dumps(data)
        producer.produce(message.encode('ascii'))
        time.sleep(1)

        # If item reaches last coordinaates
        if i == len(coordinates)-1:
            coordinates = coordinates[::-1]
            i = 0
        else:
            i += 1
    # return new_coordinates


generate_coordinates(coordinates)
bihire boris
  • 1,530
  • 3
  • 19
  • 44
  • What was the outcome of your attempt? Did you get any errors? or did you get some output but not the desired one? Could you please share some details? – Lalit May 15 '20 at 11:24
  • It works fine, I am just unable to scale it right now @Lalit – bihire boris May 15 '20 at 11:42

0 Answers0