0

As a beginner, I'm exploring Apache Kafka and confluent-kafka-python clients. When I tried to send simple messages from the producer, the consumer was able to consume messages successfully. Thought that I would give it a try for sending the image as payload. So going forward with a 1MB(png) image, my producer was unable to produce messages. The error which I encountered was

  p.produce('mytopic', callback=delivery_report, key='hello', value=str_value)
cimpl.KafkaException: KafkaError{code=MSG_SIZE_TOO_LARGE,val=10,str="Unable to produce message: Broker: Message size too large"}

Though i did some googling found out Kafka - Broker: Message size too large and How can I send large messages with Kafka (over 15MB)? So i have modified my server.props(broker side) as shown below:

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0
max.message.bytes=1048576 
message.max.bytes=1048576 
replica.fetch.max.bytes=1048576

But still I was unable to fix the issue.

producer.py

from confluent_kafka import Producer
import base64
import time

# some_data_source = ['hey', 'hi']

with open("1mb.png", "rb") as imageFile:
    str_value = base64.b64encode(imageFile.read())

p = Producer({'bootstrap.servers': 'localhost:9092', 'compression.type': 'snappy'})

def delivery_report(err, msg):
    """ Called once for each message produced to indicate delivery result.
        Triggered by poll() or flush(). """
    if err is not None:
        print('Message delivery failed: {}'.format(err))
    else:
        print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))

for _ in range(2):
    # Trigger any available delivery report callbacks from previous produce() calls
    p.poll(0)

    # Asynchronously produce a message, the delivery report callback
    # will be triggered from poll() above, or flush() below, when the message has
    # been successfully delivered or failed permanently.
    p.produce('mytopic', callback=delivery_report, key='hello', value=str_value)

# Wait for any outstanding messages to be delivered and delivery report
# callbacks to be triggered.
p.flush()

consumer.py

from confluent_kafka import Consumer


c = Consumer({
    'bootstrap.servers': 'localhost:9092',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
})

c.subscribe(['mytopic'])

while True:
    msg = c.poll(1.0)

    if msg is None:
        continue
    if msg.error():
        print("Consumer error: {}".format(msg.error()))
        continue

    print('Received message: {}'.format(msg.value().decode('utf-8')))

c.close()

Do I need to add any params or Am I missing something in configs ? Any help would be appreciated.

Thanks

1 Answers1

1

It doesn't look like you changed the broker defaults very much; it's still around 1MB.

For your client errors, you need to add message.max.bytes to the Producer config

If you need any other client properties, such as the consumer max fetch bytes, those are documented here

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md


Overall, the recommendation would be to upload your images to a centralized filestore, then send their URI location via Kafka as plain strings. This will increase throughput and reduce storage needs for your brokers, especially if you're sending/copying the same image data over multiple topics

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245