I have a Producer running on the main thread and a Consumer running on its own thread (std::thread). I have a simple program that sends a message using the Producer and then puts the main thread to sleep before trying to send another message.
Whenever my main thread goes to sleep the program just exists. No exception nothing. Same thing happens when I try to properly stop and delete my Consumer/Producer. Clearly I'm doing something wrong but I cannot tell what since I am not getting any kind of error out of my program. The last log message I see is the message I print right before putting the main thread to sleep.
I've put try-catch inside main and inside my Consumer thread. I've also called std::set_terminate and added logging in there. When my program exits the try-catch nor the terminate catch anything.
Any suggestions?
UPDATE #1 [Source]
As Sid S pointed out I'm missing the obvious source.
main.cc
int main(int argc, char** argv) {
std::cout << "% Main started." << std::endl;
std::set_terminate([](){
std::cerr << "% Terminate occurred in main." << std::endl;
abort();
});
try {
using com::anya::core::networking::KafkaMessenger;
using com::anya::core::common::MessengerCode;
KafkaMessenger messenger;
auto promise = std::promise<bool>();
auto future = promise.get_future();
messenger.Connect([&promise](MessengerCode code, std::string& message) {
promise.set_value(true);
});
future.get();
std::cout << "% Main connection successful." << std::endl;
// Produce 5 messages 5 seconds apart.
int number_of_messages_sent = 0;
while (number_of_messages_sent < 5) {
std::stringstream message;
message << "message-" << number_of_messages_sent;
auto message_send_promise = std::promise<bool>();
auto message_send_future = message_send_promise.get_future();
messenger.SendMessage(message.str(), [&message_send_promise](MessengerCode code) {
std::cout << "% Main message sent" << std::endl;
message_send_promise.set_value(true);
});
message_send_future.get();
number_of_messages_sent++;
std::cout << "% Main going to sleep for 5 seconds." << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(5));
}
// Disconnect from Kafka and cleanup.
auto disconnect_promise = std::promise<bool>();
auto disconnect_future = disconnect_promise.get_future();
messenger.Disconnect([&disconnect_promise](MessengerCode code, std::string& message) {
disconnect_promise.set_value(true);
});
disconnect_future.get();
std::cout << "% Main disconnect complete." << std::endl;
} catch (std::exception& exception) {
std::cerr << "% Exception caught in main with error: " << exception.what() << std::endl;
exit(1);
}
std::cout << "% Main exited." << std::endl;
exit(0);
}
KafkaMessenger.cc [Consumer Section]
void KafkaMessenger::Connect(std::function<void(MessengerCode , std::string&)> impl) {
assert(!running_.load());
running_.store(true);
// For the sake of brevity I've removed a whole bunch of Kafka configuration setup from the sample code.
RdKafka::ErrorCode consumer_response = consumer_->start(topic_for_consumer, 0, RdKafka::Topic::OFFSET_BEGINNING);
if (consumer_response != RdKafka::ERR_NO_ERROR) {
running_.store(false);
delete consumer_;
delete producer_;
error = RdKafka::err2str(consumer_response);
impl(MessengerCode::CONNECT_FAILED, error);
}
auto consumer_thread_started_promise = std::promise<bool>();
auto consumer_thread_started_future = consumer_thread_started_promise.get_future();
consumer_thread_ = std::thread([this, &topic_for_consumer, &consumer_thread_started_promise]() {
try {
std::cout << "% Consumer thread started." << std ::endl;
consumer_thread_started_promise.set_value(true);
while (running_.load()) {
RdKafka::Message* message = consumer_->consume(topic_for_consumer, 0, 5000);
switch (message->err()) {
case RdKafka::ERR_NO_ERROR: {
std::string message_string((char*) message->payload());
std::cout << "% Consumer received message: " << message_string << std::endl;
delete message;
break;
}
default:
std::cerr << "% Consumer consumption failed: " << message->errstr() << " error code=" << message->err() << std::endl;
break;
}
}
std::cout << "% Consumer shutting down." << std::endl;
if (consumer_->stop(topic_for_consumer, 0) != RdKafka::ERR_NO_ERROR) {
std::cerr << "% Consumer error while trying to stop." << std::endl;
}
} catch (std::exception& exception) {
std::cerr << "% Caught exception in consumer thread: " << exception.what() << std::endl;
}
});
consumer_thread_started_future.get();
std::string message("Consumer connected");
impl(MessengerCode::CONNECT_SUCCESS, message);
}
KafkaMessenger.cc [Producer Section]
void KafkaMessenger::SendMessage(std::string message, std::function<void(MessengerCode)> impl) {
assert(running_.load());
std::cout << "% Producer sending message." << std::endl;
RdKafka::ErrorCode producer_response = producer_->produce(
producer_topic_,
RdKafka::Topic::PARTITION_UA,
RdKafka::Producer::RK_MSG_COPY,
static_cast<void*>(&message), message.length(), nullptr, nullptr);
switch (producer_response) {
case RdKafka::ERR_NO_ERROR: {
std::cout << "% Producer Successfully sent (" << message.length() << " bytes)" << std::endl;
impl(MessengerCode::MESSAGE_SEND_SUCCESS);
break;
}
case RdKafka::ERR__QUEUE_FULL: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_PARTITION: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
case RdKafka::ERR__UNKNOWN_TOPIC: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
default: {
std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
impl(MessengerCode::MESSAGE_SEND_FAILED);
break;
}
}
}
Output When I run the main method this is the output that I see in the console.
% Main started.
% Consumer thread started.
% Main connection successful.
% Producer sending message.
% Producer Successfully sent (9 bytes)
% Main message sent
% Main going to sleep for 5 seconds.
% Consumer received message: message-
After closer examination I do not think that the sleep is the cause of this because when I remove the sleep this still happens. As you can see in the last log line the Consumer prints the message that it received with the last character truncated. The payload should read message-0. So something somewhere is dying.
UPDATE #2 [Stack Trace]
I came across this old but very useful post about catching signals and printing out the stack. I implemented this solution and now I can see more information about where things are crashing.
Error: signal 11:
0 main 0x00000001012e4eec _ZN3com4anya4core10networking7handlerEi + 28
1 libsystem_platform.dylib 0x00007fff60511f5a _sigtramp + 26
2 ??? 0x0000000000000000 0x0 + 0
3 main 0x00000001012f2866 rd_kafka_poll_cb + 838
4 main 0x0000000101315fee rd_kafka_q_serve + 590
5 main 0x00000001012f5d46 rd_kafka_flush + 182
6 main 0x00000001012e7f1a _ZN3com4anya4core10networking14KafkaMessenger10DisconnectENSt3__18functionIFvNS1_6common13MessengerCodeENS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEEE + 218
7 main 0x00000001012dbc45 main + 3221
8 libdyld.dylib 0x00007fff60290115 start + 1
9 ??? 0x0000000000000001 0x0 + 1
As part of my shutdown method I call producer_->flush(1000) and this causes the resulting stack trace. If I remove it then the shutdown is fine. Clearly I am misconfiguring something that is then causing this seg-fault when I attempt to flush.
UPDATE #3 [Solution]
So turns out that my classes that handled logging of Kafka events and delivery reports were scoped to a method. This was a problem because the librdkafka library takes these by reference so when my main runner method exited and cleanup commenced these objects disappeared. I scoped the loggers to the class level and this fixed the crash.