0

I recently tried to use python to send messages to Kafka. When using simple byte messages, it works. But now, I have a json data, that I need to send to Kafka topic which will then be consumed by a Java application.

I tried to find out how to convert json to byteArray (that is what the Java application is expecting as the payload). So, I came up with the below python script. But it failed as there are a few boolean variables in the json and I am getting a type error as Json true and Python True are different in case. I tried to enclose the json in single quotes, but again I got the error 'EOL while scanning string literal'. Only once I fix this error, will I know whether I am able to send this data to Kafka or not, so as of now I am struggling with the conversion part. Below is my code and the json.

Json:

{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
,
"state": {
"slips": [{
  "id": "550",
  "creationDate": "2017-01-30T14:14:14.000+0000",
  "accountRef": "1",
  "customerRef": "2",
  "source": {
    "channelRef": "K"
  },
  "receipt": "O/0000002/0000487",
  "isSettled": true,
  "isConfirmed": true,
  "lines": {
    "number": 1,
    "win": 1,
    "lose": 0,
    "voided": 0
  }
}]
}
}

Python script:

#!/usr/bin/python

from kafka import KafkaProducer

KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'

producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'),bootstrap_servers=KAFKA_BROKERS)

messages = '{
"header": {
"activityId": "550",
"timeStamp": "1490093093000",
"sequencingId": 1
},
"queueId": "604",
"contextRef": "SLIP.UPDATE"
},
"state": {
"slips": [{
"id": "550",
"creationDate": "2017-01-30T14:14:14.000+0000",
"accountRef": "1",
"customerRef": "2",
"source": {
"channelRef": "K"
},
"receipt": "O/0000002/0000487",
"isSettled": true,
"isConfirmed": true,
"lines": {
"number": 1,
"win": 1,
"lose": 0,
"voided": 0
 }
}]
}
}'

info_as_json = json.loads(messages)

producer.send(KAFKA_TOPIC, info_as_json)

The consumer was consuming messages until I was publishing message like :

messages = [b'hello kafka', b'I am sending', b'3 test messages']

Consumer:

#!/usr/bin/python

import sys
from kafka import KafkaConsumer

KAFKA_TOPIC = 'slips'
KAFKA_BROKERS = '172.17.0.1:9092'

consumer = KafkaConsumer(bootstrap_servers=KAFKA_BROKERS,auto_offset_reset='earliest')

consumer.subscribe([KAFKA_TOPIC])
try:
    for message in consumer:
        print(message.value)
except KeyboardInterrupt:
    sys.exit()

Update:

I added the triple quotes in the json string and the producer code doesn't give any error now. But the consumer is not consuming the messages. At least, it is not printing them as I would expect.

albus_c
  • 6,292
  • 14
  • 36
  • 77
T Anna
  • 874
  • 5
  • 21
  • 52
  • Possible duplicate of [Pythonic way to create a long multi-line string](https://stackoverflow.com/questions/10660435/pythonic-way-to-create-a-long-multi-line-string) – tevemadar Jul 21 '18 at 15:07
  • can you please try to replace this part in the script adding three double quotes before and after, i.e. like: messages = """{ "header": { "activityId": "550", # etc etc "lose": 0, "voided": 0 } }] } }""" – Antonino Jul 21 '18 at 15:08
  • Thanks, it helped. But my consumer is not picking up the message. Do you find any issue with my producer code? – T Anna Jul 21 '18 at 19:34
  • Show how you're consuming the data – OneCricketeer Jul 22 '18 at 04:31
  • I have edited the question for the consumer code. – T Anna Jul 22 '18 at 05:52

1 Answers1

1

Finally, I was able to consume messages. It seems there was a problem with the producer. i went through some posts on StackOverflow and then added the below two changes in my producer code and it just worked.

1) linger_ms=10 while initializing the producer

producer = KafkaProducer(value_serializer=lambda v:json.dumps(v).encode('utf-8'),bootstrap_servers=KAFKA_BROKERS, linger_ms=10)

2) flushing after sending the message

producer.flush()

I am yet to find why my producer was working without these changes for simple byte messages and not for the json.

T Anna
  • 874
  • 5
  • 21
  • 52