I recently tried to use python to send messages to Kafka. When using simple byte messages, it works. But now, I have a json data, that I need to send to Kafka topic which will then be consumed by a Java application.
I tried to find out how to convert json to byteArray (that is what the Java application is expecting as the payload). So, I came up with the below python script. But it failed as there are a few boolean variables in the json and I am getting a type error as Json true and Python True are different in case. I tried to enclose the json in single quotes, but again I got the error 'EOL while scanning string literal'. Only once I fix this error, will I know whether I am able to send this data to Kafka or not, so as of now I am struggling with the conversion part. Below is my code and the json.
Json:
{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
,
"state": {
"slips": [{
"id": "550",
"creationDate": "2017-01-30T14:14:14.000+0000",
"accountRef": "1",
"customerRef": "2",
"source": {
"channelRef": "K"
},
"receipt": "O/0000002/0000487",
"isSettled": true,
"isConfirmed": true,
"lines": {
"number": 1,
"win": 1,
"lose": 0,
"voided": 0
}
}]
}
}
Python script:
#!/usr/bin/python
from kafka import KafkaProducer
KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'
producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'),bootstrap_servers=KAFKA_BROKERS)
messages = '{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
},
"state": {
"slips": [{
"id": "550",
"creationDate": "2017-01-30T14:14:14.000+0000",
"accountRef": "1",
"customerRef": "2",
"source": {
"channelRef": "K"
},
"receipt": "O/0000002/0000487",
"isSettled": true,
"isConfirmed": true,
"lines": {
"number": 1,
"win": 1,
"lose": 0,
"voided": 0
}
}]
}
}'
info_as_json = json.loads(messages)
producer.send(KAFKA_TOPIC, info_as_json)
The consumer was consuming messages until I was publishing message like :
messages = [b'hello kafka', b'I am sending', b'3 test messages']
Consumer:
#!/usr/bin/python
import sys
from kafka import KafkaConsumer
KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'
consumer = KafkaConsumer(bootstrap_servers=KAFKA_BROKERS,auto_offset_reset='earliest')
consumer.subscribe([KAFKA_TOPIC])
try:
for message in consumer:
print(message.value)
except KeyboardInterrupt:
sys.exit()
Update:
I added the triple quotes in the json string and the producer code doesn't give any error now. But the consumer is not consuming the messages. At least, it is not printing them as I would expect.