2

I would like to get help with kafka.

Is it possible to consume utf8(Japanese) messages using kafka-console-consumer.sh which was produced by kafka-python ?

  • kafka-python code
self._client = KafkaProducer(\
                    bootstrap_servers=bootstrap_servers,\
                    api_version=api_version,\
                    value_serializer=lambda m: json.dumps(m).encode('utf-8'))

for message in data:
    self._client.send(topic, message)
  • kafka-cosole-consumer.sh
./kafka-console-consumer.sh --bootstrap-server msk.amazonaws.com:9092 --topic meetings --from-beginning --property value.deserializer=org.apache.kafka.common.serialization.BytesDeserializer
  • Result
"uuid": "d/U55wRdSj60yJBcYVpt8A==", "id": 4459052115, "topic": "\x5Cu6e21\x5Cu8fba \x5Cu88d5\x5Cu306e\x5Cu30d1\x5Cu30fc\x5Cu30bd\x5Cu30ca\x5Cu30eb\x5Cu30df\x5Cu30fc\x5Cu30c6\x5Cu30a3\x5Cu30f3\x5Cu30b0\x5Cu30eb\x5Cu30fc\x5Cu30e0", "host": "\x5Cu6e21\x5Cu8fba \x5Cu88d5", "email": "y-watanabe@creationline.com", "user_type": "\x5Cu30e9\x5Cu30a4\x5Cu30bb\x5Cu30f3\x5Cu30b9\x5Cu6e08\x5Cu307f", "start_time": "2020-11-03T10:36:29Z", "end_time": "", "duration": "", "participants": 1, "has_pstn": false, "has_voip": false, "has_3rd_party_audio": false, "has_video": false, "has_screen_share": false, "has_recording": false, "has_sip": false, "dept": ""}

It seems that Japanese string(utf-8) is not deserialized correctly. It is corrupted.

I am trying to send message through below route.

producer(kafka-python) -> AWS MSK (2.4.1.1) -> consumer(kafka-console-consumer.sh) 
Yu Watanabe
  • 621
  • 4
  • 17

1 Answers1

1

I had couple things that I was not understanding correctly.

  1. ByteDeserializer converts Byte objects into Byte type in Java. So as @codeflush.dev had said, I am seeing correct behaviour.

Deserialize a record value from a byte array into a value or object.

  1. I was missing the fact that json.dumps escapes non-ascii characters .This is controlled by ensure_ascii option.

If ensure_ascii is true (the default), the output is guaranteed to have all incoming non-ASCII characters escaped. If ensure_ascii is false, these characters will be output as-is.

By setting ensure_ascii to False then converting into byte , solved the issue.

producer = KafkaProducer(bootstrap_servers=['localhost:9092'], value_serializer=lambda m: json.dumps(m, ensure_ascii=False).encode('utf-8'))

Relates to

Saving utf-8 texts in json.dumps as UTF8, not as \u escape sequence

Yu Watanabe
  • 621
  • 4
  • 17