Ramping up on NestJS + Microservices using Kafka for messaging/communication ..
I have two microservices sending data to a same single kafka topic (consumed by a third microservice). These two producer microservices subscribed to the response. Data is not sent simultaneously. In the case of the first microservice, data is pushed, the consumer processes the data and sends back a reply which is properly processed. In the case of the second microservice, data is pushed and processed bu the consumer, but the microservice does not handle the reply and we end up with a time out.
To illustrate the problem I am facing I have created 3 micro-services (code extracts below):
- producer1 listening on HTTP port 9001 processing POST command on /api/
- producer2 listening on HTTP port 9002 processing POST command on /api/
- consumer listening to Kafka topic "send_data" on port 9092
Data can be posted on http://localhost:9001/api/ (producer1), which expects data format in a simple JSON { data: <message> } Same thing on http://localhost:9002/api/ (producer2)
In each case, producer1 and producer2 will forward that very same data to the kafka topic "send_data", to be consumed by the consumer microservice. producer1 and producer2 did subscribe to the reply topic via the subscribeToResponseOf('send_data')
consumer microservice handles the message and sends back a response to the reply topiuc with following JSON data format: { data: "Response: <message>" }.
producer1 and producer2 return that same response message as the response of the POST command.
RESULTS
1/ When running producer1 and consumer alone:
curl -X POST http://localhost:3001/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"data":"Message received: hello world!"}
2/ When runnning producer2 and consumer alone:
curl -X POST http://localhost:3002/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"data":"Message received: hello world!"}
3/ When running producer1 and producer2 and consumer together (producer1 is started first, producer2 started after)
curl -X POST http://localhost:3001/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"data":"Message received: hello world!"}
curl -X POST http://localhost:3002/api/ -H 'Content-type: application/json' --data-raw '{ "data": "hello world!" }'
{"statusCode":408,"message":"Request timeout"}
QUESTION
I am obviously missing something, but could not find any explanation after searching the web ...
All three microservices use the same groupId. I do not see the problem when using different groupId.
Any help to explain what is going on would be really appreciated !
CODE EXTRACT
producer module: (only clientId differ between producer1 and producer2)
@Module({
imports: [ClientsModule.register([
{
name: 'CONSUMER_MICROSERVICE',
transport: Transport.KAFKA,
options: {
client: {
clientId: 'producer1',
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'concurrency-consumer',
},
}
}
])],
controllers: [AppController],
providers: [AppService],
})
producer service (controller is just passing the data)
export class AppService implements OnModuleInit {
constructor(@Inject('CONSUMER_MICROSERVICE') private readonly consumerClient: ClientKafka) {}
async sendData(sendDataDto: SendDataDto): Promise<ReturnDataDto> {
const res = await this.consumerClient
.send('send_data', JSON.stringify(sendDataDto))
.pipe(
timeout(5000),
catchError((err) => {
console.log("TIMEOUT ERROR")
console.log(err);
throw new HttpException('Request timeout', HttpStatus.REQUEST_TIMEOUT)
})
)
.toPromise()
console.log(res);
return res;
}
getData(): { message: string } {
return { message: 'Welcome to producer1!' };
}
async onModuleInit() {
this.consumerClient.subscribeToResponseOf('send_data');
await this.consumerClient.connect();
}
}
consumer main
const app = await NestFactory.createMicroservice(
AppModule,
{
transport: Transport.KAFKA,
options: {
client: {
brokers: ['localhost:9092'],
},
consumer: {
groupId: 'concurrency-consumer'
}
},
})
consumer controller & service
export class AppController {
constructor(private readonly appService: AppService) {}
@MessagePattern('send_data')
async handleSendData(@Payload(ValidationPipe) sendDataDto: SendDataDto) {
return this.appService.handleSendData(sendDataDto);
}
}
@Injectable()
export class AppService {
async handleSendData(sendDataDto: SendDataDto): Promise<ReturnDataDto> {
var res = <ReturnDataDto>sendDataDto;
res.data = "Message received: " + res.data;
return Promise.resolve(res);
}
}