3

I have a created a HighLevelProducer to publish messages to a topic stream that will be consumed by a ConsumerGroupStream using kafka-node. When I create multiple consumers from the same ConsumerGroup to consume from that same topic only one partition is created and only one consumer is consuming. I have also tried to define the number of partitions for that topic although I'm not sure if is required to define it upon creating the topic and if so how many partitions will I need in advance. In addition, is it possible to push an object to the Transform stream and not a string (I currently used JSON.stringify because otherwise I got [Object object] in the consumer.

const myProducerStream = ({ kafkaHost, highWaterMark, topic }) => {
    const kafkaClient = new KafkaClient({ kafkaHost });
    const producer = new HighLevelProducer(kafkaClient);
    const options = {
        highWaterMark,
        kafkaClient,
        producer
    }; 

    kafkaClient.refreshMetadata([topic], err => {
        if (err) throw err; 
    }); 

    return new ProducerStream(options);
};

const transfrom = topic => new Transform({
    objectMode: true,
    decodeStrings: true,
    transform(obj, encoding, cb) {
        console.log(`pushing message ${JSON.stringify(obj)} to topic "${topic}"`);

        cb(null, {
            topic,
            messages: JSON.stringify(obj)
        });
    }
});

const publisher = (topic, kafkaHost, highWaterMark) => {
    const myTransfrom = transfrom(topic);
    const producer = myProducerStream({ kafkaHost, highWaterMark, topic });

    myTransfrom.pipe(producer);

    return myTransform;
};

The consumer:


const createConsumerStream = (sourceTopic, kafkaHost, groupId) => {
    const consumerOptions = {
        kafkaHost,
        groupId,
        protocol: ['roundrobin'],
        encoding: 'utf8',
        id: uuidv4(),
        fromOffset: 'latest',
        outOfRangeOffset: 'earliest',
    };

    const consumerGroupStream = new ConsumerGroupStream(consumerOptions, sourceTopic);

    consumerGroupStream.on('connect', () => {
        console.log(`Consumer id: "${consumerOptions.id}" is connected!`);
    });

    consumerGroupStream.on('error', (err) => {
        console.error(`Consumer id: "${consumerOptions.id}" encountered an error: ${err}`);
    });

    return consumerGroupStream; 
};

const publisher = (func, destTopic, consumerGroupStream, kafkaHost, highWaterMark) => { 
    const messageTransform = new AsyncMessageTransform(func, destTopic);

    const resultProducerStream = myProducerStream({ kafkaHost, highWaterMark, topic: destTopic })

    consumerGroupStream.pipe(messageTransform).pipe(resultProducerStream);
}; 

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Dan W
  • 138
  • 10

1 Answers1

2

For the first question: The maximum working consumers in a group are equal to the number of partitions.

So if you have TopicA with 1 partition and you have 5 consumers in your consumer group, 4 of them will be idle.

If you have TopicA with 5 partitions and you have 5 consumers in your consumer group, all of them will be active and consuming messages from your topic.

To specify the number of partitions, you should create the topic from CLI instead of expecting Kafka to create it when you first publish messages.

To create a topic with a specific number of partitions:

bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 3 --partitions 3 --topic test

To alter the number of partitions in an already existed topic:

bin/kafka-topics.sh --zookeeper zk_host:port/chroot --alter --topic test 
       --partitions 40 

Please note that you can only increase the number of partitions, you can not decrease them.

Please refer to Kafka Docs https://kafka.apache.org/documentation.html

Also if you'd like to understand more about Kafka please check the free book https://www.confluent.io/resources/kafka-the-definitive-guide/

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Oras
  • 1,036
  • 1
  • 12
  • 18
  • Is it possible to specify the number of partitions from you nodejs application? In the kafka-node documentation they executed the following method: var topicsToCreate = [{ topic: 'topic1', partitions: 1, replicationFactor: 2 }] client.createTopics(topicsToCreate, (error, result) => { // result is an array of any errors if a given topic could not be created }); Although this didn't work for me. – Dan W Jan 07 '20 at 05:53
  • didn't work as you had an error in the callback function? or created with default number of partitions? also which version of kafka you're using? – Oras Jan 07 '20 at 20:40
  • I'm using the latest kafka version. The problem occurs when I create a producer stream and it only produces to one topic partition: https://stackoverflow.com/questions/59627078/high-level-producer-producing-only-to-single-partition-in-producerstream – Dan W Jan 07 '20 at 22:30