first I am doing baby steps in python and kafka, So let's say I have a listA=[item1, item2, item3] and every item of listA is a producer
on a topic. Now what I want is to dynamically add/remove items to listA and became immediately producers also every item should run on it's own thread as they should be independent.
So basically I am trying to scale the application.
so far I tried to hard code every producer item and run it in its own terminal
each Item
from pykafka import KafkaClient
import json
from datetime import datetime
import uuid
import time
input_file = open('./data/item1.json')
json_array = json.load(input_file)
coordinates = json_array['features'][0]['geometry']['coordinates']
# Generate uuid
def generate_uuid():
return uuid.uuid4()
# Kafaka producer
client = KafkaClient(hosts="localhost:9092")
topic = client.topics['test_kafka2']
producer = topic.get_sync_producer()
# Generate all coordinates
def generate_coordinates(coordinates):
# new_coordinates = []
i = 0
while i < len(coordinates):
data = {}
data['class'] = 201
data['key'] = str(data['class']) + '_' + str(generate_uuid())
data['time_stamp'] = str(datetime.utcnow())
data['longitude'] = coordinates[i][0]
data['latitude'] = coordinates[i][1]
message = json.dumps(data)
producer.produce(message.encode('ascii'))
time.sleep(1)
# If item reaches last coordinaates
if i == len(coordinates)-1:
coordinates = coordinates[::-1]
i = 0
else:
i += 1
# return new_coordinates
generate_coordinates(coordinates)