0

Ramping up on NestJS + Microservices using Kafka for messaging/communication ..

I have two microservices sending data to a same single kafka topic (consumed by a third microservice). These two producer microservices subscribed to the response. Data is not sent simultaneously. In the case of the first microservice, data is pushed, the consumer processes the data and sends back a reply which is properly processed. In the case of the second microservice, data is pushed and processed bu the consumer, but the microservice does not handle the reply and we end up with a time out.

To illustrate the problem I am facing I have created 3 micro-services (code extracts below):

  • producer1 listening on HTTP port 9001 processing POST command on /api/
  • producer2 listening on HTTP port 9002 processing POST command on /api/
  • consumer listening to Kafka topic "send_data" on port 9092

Data can be posted on http://localhost:9001/api/ (producer1), which expects data format in a simple JSON { data: <message> } Same thing on http://localhost:9002/api/ (producer2)

In each case, producer1 and producer2 will forward that very same data to the kafka topic "send_data", to be consumed by the consumer microservice. producer1 and producer2 did subscribe to the reply topic via the subscribeToResponseOf('send_data')

consumer microservice handles the message and sends back a response to the reply topiuc with following JSON data format: { data: "Response: <message>" }.

producer1 and producer2 return that same response message as the response of the POST command.

RESULTS

1/ When running producer1 and consumer alone:

curl -X POST http://localhost:3001/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"data":"Message received: hello world!"}

2/ When runnning producer2 and consumer alone:

curl -X POST http://localhost:3002/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"data":"Message received: hello world!"}

3/ When running producer1 and producer2 and consumer together (producer1 is started first, producer2 started after)

curl -X POST http://localhost:3001/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"data":"Message received: hello world!"}

curl -X POST http://localhost:3002/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"statusCode":408,"message":"Request timeout"}

QUESTION

I am obviously missing something, but could not find any explanation after searching the web ...

All three microservices use the same groupId. I do not see the problem when using different groupId.

Any help to explain what is going on would be really appreciated !

CODE EXTRACT

producer module: (only clientId differ between producer1 and producer2)

    @Module({
      imports: [ClientsModule.register([
        {
          name: 'CONSUMER_MICROSERVICE',
          transport: Transport.KAFKA,
          options: {
            client: {
              clientId: 'producer1',
              brokers: ['localhost:9092'],
            },
            consumer: {
              groupId: 'concurrency-consumer',
            },
          }
        }
      ])],
      controllers: [AppController],
      providers: [AppService],
    })

producer service (controller is just passing the data)

export class AppService implements OnModuleInit {

  constructor(@Inject('CONSUMER_MICROSERVICE') private readonly consumerClient: ClientKafka) {}

  async sendData(sendDataDto: SendDataDto): Promise<ReturnDataDto> {

    const res = await this.consumerClient
      .send('send_data', JSON.stringify(sendDataDto))
      .pipe(
        timeout(5000),
        catchError((err) => {
          console.log("TIMEOUT ERROR")
          console.log(err);
          throw new HttpException('Request timeout', HttpStatus.REQUEST_TIMEOUT)

        })
      )
      .toPromise()

    console.log(res);

    return res;

  }

  getData(): { message: string } {
    return { message: 'Welcome to producer1!' };
  }

  async onModuleInit() {
    this.consumerClient.subscribeToResponseOf('send_data');
    await this.consumerClient.connect();
  }
}

consumer main

  const app = await NestFactory.createMicroservice(
    AppModule,
    {
      transport: Transport.KAFKA,
      options: {
        client: {
          brokers: ['localhost:9092'],
        },
        consumer: {
          groupId: 'concurrency-consumer'
        }
      },

    })

consumer controller & service

export class AppController {
  constructor(private readonly appService: AppService) {}

  @MessagePattern('send_data')
  async handleSendData(@Payload(ValidationPipe) sendDataDto: SendDataDto) {
    return this.appService.handleSendData(sendDataDto);
  }

}

@Injectable()
export class AppService {

  async handleSendData(sendDataDto: SendDataDto): Promise<ReturnDataDto> {
    var res = <ReturnDataDto>sendDataDto;
    res.data = "Message received: " + res.data;
    return Promise.resolve(res);
  }

}
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Azrael
  • 1
  • 2

1 Answers1

0

Answering my own question :)

This appears to be expected behavior, given the default kafka configuration.

The consumer, by replying to the request will produce a message on the Kafka topic send_data.reply.

In the case above, kafka broker has topic auto creation enabled. By default, Kafka allows only 1 partition per topic (num.partitions=1 in kafka's server.properties). In the example above, producer1 and producer2 use therefore the same groupId and same partition number. In that case, when a producer (consumer microservice in that case when it comes to the reply) pushes a message to a topic consumed by two microservices using the same groupId and partition number (producer1 and producer2); it will be consumed only once, which explains why producer2 misses the reply.

POSSIBLE SOLUTIONS

1/ create topics outside of the nestjs app (recommended)

  • deactivate auto topic creation on the kafka server by setting auto.create.topics.enable to false in the Kafka's server.properties

  • create topics send_data and send_data.reply using the bin/kafka_topics.sh command, and specifying a number of partition greater than 1

=> nestjs will push reply using different partition number. This can also be enforced at code level.

2/ other solution (not optimal):

  • Leave the auto creation topic option.

  • Set the number of partitions to b greater than 1 in kafka's server.properties

this is maybe less recommended as it is a broker-wide setting

3/ other solution (unsolved and maybe not recommended):

  • replace nestjs's ClientKafka by a custom implementation, giving access to the admin function of the underlying kafkajs client object; allowing creation of topics from the nestjs app

certainly not recommended, it appears to be safer to create the topics outside of the nestjs apps, as recommended here

Any further comment will be appreciated !

Azrael
  • 1
  • 2