3

In our nest.js application we use kafkajs client for kafka. We need to get chance monitor statistic. One of metrics is lag.

Trying to figure out if kafkajs provides any and nothing interesting. (The most interesting thing in payload are: timestamp, offset, batchContext.firstOffset, batchContext.firstTimestamp, batchContext.maxTimestamp)

Questions

Is there any ideas how to log lag value and other statistic provided by kafkajs?

Should I think about implementing my own statistic monitor to collect required information in node application which uses kafka.js client?

New Details 1

Following documentation I can get batch.highWatermark, where

batch.highWatermark is the last committed offset within the topic partition. It can be useful for calculating lag.

Trying

  await consumer.run({
    eachBatchAutoResolve: true,
    eachBatch: async (data) => {
      console.log('Received data.batch.messages: ', data.batch.messages)
      console.log('Received data.batch.highWatermark: ', data.batch.highWatermark)
    },
  })

I can get information like a next one:

Received data.batch.messages:  [
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '144',
    key: null,
    value: <Buffer 68 65 6c 6c 6f 21>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '145',
    key: null,
    value: <Buffer 6f 74 68 65 72 20 6d 65 73 73 61 67 65>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  },
  {
    magicByte: 2,
    attributes: 0,
    timestamp: '1628877419958',
    offset: '146',
    key: null,
    value: <Buffer 6d 6f 72 65 20 6d 65 73 73 61 67 65 73>,
    headers: {},
    isControlRecord: false,
    batchContext: {
      firstOffset: '144',
      firstTimestamp: '1628877419958',
      partitionLeaderEpoch: 0,
      inTransaction: false,
      isControlBatch: false,
      lastOffsetDelta: 2,
      producerId: '-1',
      producerEpoch: 0,
      firstSequence: 0,
      maxTimestamp: '1628877419958',
      timestampType: 0,
      magicByte: 2
    }
  }
]
Received data.batch.highWatermark:  147

Is any ideas how to use batch.highWatermark in tag calculation then?

Sergii
  • 7,044
  • 14
  • 58
  • 116
  • The batch.highwatermark is the offset of the last commited message for the current partition (i.e. what you have already acknowledged you have read). By subtracting that from the message.offset you can compare that to the batch size. The bigger the difference, the bigger the lag. You may want to run a tally for each partition. https://kafka.js.org/docs/consuming – aggaton Jul 20 '22 at 16:47

2 Answers2

1

Looks like the only way to get offset lag metric is by using instrumentation events:

consumer.on(consumer.events.END_BATCH_PROCESS, (payload) =>
  console.log(payload.offsetLagLow),
);

offsetLagLow measures the offset delta between first message in the batch and the last offset in the partition (highWatermark). You can also use offsetLag but it is based on the last offset of the batch.

As @Sergii mentioned there are some props available directly when you are using eachBatch (here are all available methods on the batch prop). But you won't get that props if you are using eachMessage. So instrumentation events are the most universal approach.

SleepWalker
  • 1,152
  • 14
  • 17
0

This is the code we used to compute the lag for each client/group/topic/partition.

import { Kafka } from 'kafkajs'
async function lag (clientConfig) { 
    let status = []
    const kafkaClient = new Kafka(clientConfig)
    const admin = kafkaClient.admin()
    await admin.connect()
    const groups = await admin.listGroups()
    const groupsNames = await groups.groups.map(x => x.groupId)
    const gd = await admin.describeGroups(groupsNames)
    let currentMapOfTopicOffsetByGroupId = {}
    for (const g of gd.groups) {
        const topicOffset = await admin.fetchOffsets({ groupId: g.groupId })
        if (currentMapOfTopicOffsetByGroupId[g.groupId] == undefined) {
            currentMapOfTopicOffsetByGroupId[g.groupId] = {}    
        }
        topicOffset.forEach(to => {
            to.partitions.forEach(p => {
                if (currentMapOfTopicOffsetByGroupId[g.groupId][to.topic] == undefined) {
                    currentMapOfTopicOffsetByGroupId[g.groupId][to.topic] = {}
                }
                currentMapOfTopicOffsetByGroupId[g.groupId][to.topic][parseInt(p.partition)] = p.offset     
            })
        })
        
        for (const m of g.members) {    
            const memberMetadata = AssignerProtocol.MemberMetadata.decode(m.memberMetadata)
            const memberAssignment = AssignerProtocol.MemberAssignment.decode(m.memberAssignment)
            for (const t of memberMetadata.topics) {

                const res = await admin.fetchTopicOffsets(t)    

                res.forEach(r => {
                    const lag = parseInt(r.high) - parseInt(currentMapOfTopicOffsetByGroupId[g.groupId][t][parseInt(r.partition)])
                    if (currentMapOfTopicOffsetByGroupId[g.groupId][t] !== undefined) {
                        status.push({
                            HOST: m.clientHost,
                            STATE: g.state,
                            MEMBER_ID: m.memberId,
                            GROUP_ID: g.groupId,
                            TOPIC: t,
                            PARTITION: r.partition,
                            OFFSET: r.offset,
                            C_OFFSET: parseInt(currentMapOfTopicOffsetByGroupId[g.groupId][t][parseInt(r.partition)]),
                            LAG: lag
                        })
                    }
                }) 
            }
        }
    }
    return status
}
Adda_25
  • 498
  • 7
  • 12