0

I have a Producer running on the main thread and a Consumer running on its own thread (std::thread). I have a simple program that sends a message using the Producer and then puts the main thread to sleep before trying to send another message.

Whenever my main thread goes to sleep the program just exists. No exception nothing. Same thing happens when I try to properly stop and delete my Consumer/Producer. Clearly I'm doing something wrong but I cannot tell what since I am not getting any kind of error out of my program. The last log message I see is the message I print right before putting the main thread to sleep.

I've put try-catch inside main and inside my Consumer thread. I've also called std::set_terminate and added logging in there. When my program exits the try-catch nor the terminate catch anything.

Any suggestions?

UPDATE #1 [Source]

As Sid S pointed out I'm missing the obvious source.

main.cc

int main(int argc, char** argv) {
  std::cout << "% Main started." << std::endl;

  std::set_terminate([](){
    std::cerr << "% Terminate occurred in main." << std::endl;
    abort();
  });

  try {
    using com::anya::core::networking::KafkaMessenger;
    using com::anya::core::common::MessengerCode;
    KafkaMessenger messenger;

    auto promise = std::promise<bool>();
    auto future = promise.get_future();
    messenger.Connect([&promise](MessengerCode code, std::string& message) {
      promise.set_value(true);
    });
    future.get();

    std::cout << "% Main connection successful." << std::endl;

    // Produce 5 messages 5 seconds apart.
    int number_of_messages_sent = 0;
    while (number_of_messages_sent < 5) {
      std::stringstream message;
      message << "message-" << number_of_messages_sent;

      auto message_send_promise = std::promise<bool>();
      auto message_send_future = message_send_promise.get_future();
      messenger.SendMessage(message.str(), [&message_send_promise](MessengerCode code) {
        std::cout << "% Main message sent" << std::endl;
        message_send_promise.set_value(true);
      });
      message_send_future.get();

      number_of_messages_sent++;
      std::cout << "% Main going to sleep for 5 seconds." << std::endl;
      std::this_thread::sleep_for(std::chrono::seconds(5));
    }

    // Disconnect from Kafka and cleanup.
    auto disconnect_promise = std::promise<bool>();
    auto disconnect_future = disconnect_promise.get_future();
    messenger.Disconnect([&disconnect_promise](MessengerCode code, std::string& message) {
      disconnect_promise.set_value(true);
    });
    disconnect_future.get();
    std::cout << "% Main disconnect complete." << std::endl;
  } catch (std::exception& exception) {
    std::cerr << "% Exception caught in main with error: " << exception.what() << std::endl;
    exit(1);
  }

  std::cout << "% Main exited." << std::endl;
  exit(0);
}

KafkaMessenger.cc [Consumer Section]

void KafkaMessenger::Connect(std::function<void(MessengerCode , std::string&)> impl) {
  assert(!running_.load());
  running_.store(true);

  // For the sake of brevity I've removed a whole bunch of Kafka configuration setup from the sample code.

  RdKafka::ErrorCode consumer_response = consumer_->start(topic_for_consumer, 0, RdKafka::Topic::OFFSET_BEGINNING);

  if (consumer_response != RdKafka::ERR_NO_ERROR) {
    running_.store(false);
    delete consumer_;
    delete producer_;

    error = RdKafka::err2str(consumer_response);
    impl(MessengerCode::CONNECT_FAILED, error);
  }

  auto consumer_thread_started_promise = std::promise<bool>();
  auto consumer_thread_started_future = consumer_thread_started_promise.get_future();
  consumer_thread_ = std::thread([this, &topic_for_consumer, &consumer_thread_started_promise]() {
    try {
      std::cout << "% Consumer thread started." << std ::endl;
      consumer_thread_started_promise.set_value(true);

      while (running_.load()) {
        RdKafka::Message* message = consumer_->consume(topic_for_consumer, 0, 5000);

        switch (message->err()) {
          case RdKafka::ERR_NO_ERROR: {
            std::string message_string((char*) message->payload());
            std::cout << "% Consumer received message: " << message_string << std::endl;
            delete message;
            break;
          }
          default:
            std::cerr << "% Consumer consumption failed: " << message->errstr() << " error code=" << message->err() << std::endl;
            break;
        }
      }

      std::cout << "% Consumer shutting down." << std::endl;
      if (consumer_->stop(topic_for_consumer, 0) != RdKafka::ERR_NO_ERROR) {
        std::cerr << "% Consumer error while trying to stop." << std::endl;
      }
    } catch (std::exception& exception) {
      std::cerr << "% Caught exception in consumer thread: " << exception.what() << std::endl;
    }
  });

  consumer_thread_started_future.get();
  std::string message("Consumer connected");
  impl(MessengerCode::CONNECT_SUCCESS, message);
}

KafkaMessenger.cc [Producer Section]

void KafkaMessenger::SendMessage(std::string message, std::function<void(MessengerCode)> impl) {
  assert(running_.load());
  std::cout << "% Producer sending message." << std::endl;

  RdKafka::ErrorCode producer_response = producer_->produce(
      producer_topic_,
      RdKafka::Topic::PARTITION_UA,
      RdKafka::Producer::RK_MSG_COPY,
      static_cast<void*>(&message), message.length(), nullptr, nullptr);

  switch (producer_response) {
    case RdKafka::ERR_NO_ERROR: {
      std::cout << "% Producer Successfully sent (" << message.length() << " bytes)" << std::endl;
      impl(MessengerCode::MESSAGE_SEND_SUCCESS);
      break;
    }
    case RdKafka::ERR__QUEUE_FULL: {
      std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
      impl(MessengerCode::MESSAGE_SEND_FAILED);
      break;
    }
    case RdKafka::ERR__UNKNOWN_PARTITION: {
      std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
      impl(MessengerCode::MESSAGE_SEND_FAILED);
      break;
    }
    case RdKafka::ERR__UNKNOWN_TOPIC: {
      std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
      impl(MessengerCode::MESSAGE_SEND_FAILED);
      break;
    }
    default: {
      std::cerr << "% Sending message failed: " << RdKafka::err2str(producer_response) << std::endl;
      impl(MessengerCode::MESSAGE_SEND_FAILED);
      break;
    }
  }
}

Output When I run the main method this is the output that I see in the console.

% Main started.
% Consumer thread started.
% Main connection successful.
% Producer sending message.
% Producer Successfully sent (9 bytes)
% Main message sent
% Main going to sleep for 5 seconds.
% Consumer received message: message-

After closer examination I do not think that the sleep is the cause of this because when I remove the sleep this still happens. As you can see in the last log line the Consumer prints the message that it received with the last character truncated. The payload should read message-0. So something somewhere is dying.

UPDATE #2 [Stack Trace]

I came across this old but very useful post about catching signals and printing out the stack. I implemented this solution and now I can see more information about where things are crashing.

Error: signal 11:
0   main                                0x00000001012e4eec _ZN3com4anya4core10networking7handlerEi + 28
1   libsystem_platform.dylib            0x00007fff60511f5a _sigtramp + 26
2   ???                                 0x0000000000000000 0x0 + 0
3   main                                0x00000001012f2866 rd_kafka_poll_cb + 838
4   main                                0x0000000101315fee rd_kafka_q_serve + 590
5   main                                0x00000001012f5d46 rd_kafka_flush + 182
6   main                                0x00000001012e7f1a _ZN3com4anya4core10networking14KafkaMessenger10DisconnectENSt3__18functionIFvNS1_6common13MessengerCodeENS4_12basic_stringIcNS4_11char_traitsIcEENS4_9allocatorIcEEEEEEE + 218
7   main                                0x00000001012dbc45 main + 3221
8   libdyld.dylib                       0x00007fff60290115 start + 1
9   ???                                 0x0000000000000001 0x0 + 1

As part of my shutdown method I call producer_->flush(1000) and this causes the resulting stack trace. If I remove it then the shutdown is fine. Clearly I am misconfiguring something that is then causing this seg-fault when I attempt to flush.

UPDATE #3 [Solution]

So turns out that my classes that handled logging of Kafka events and delivery reports were scoped to a method. This was a problem because the librdkafka library takes these by reference so when my main runner method exited and cleanup commenced these objects disappeared. I scoped the loggers to the class level and this fixed the crash.

Przemek Lach
  • 1,348
  • 2
  • 19
  • 42
  • You have the source code and are unable to figure it out. Yet you expect us to be able to guess what the problem is without seeing the source code. Edit your question to include a [mcve]. – Sid S Mar 19 '18 at 01:36

1 Answers1

1

Kafka message payloads are just binary data and unless you send a string with a trailing nul-byte it will not include such a nul-byte, this causes your std::string constructor to read into adjacent memory looking for a nul, possibly accessing unmapped memory which will cause your application to crash, or at least garbel up your terminal.

Use the message length in conjunction with the payload to construct a std::string that is limited to the actual number of bytes, it will still not be safe to print, but it is a start:

std::string message_string((char*) message->payload(), message->len());
Edenhill
  • 2,897
  • 22
  • 35
  • Thanks for that tip. Any suggestions for how to deal with crashes that just exit the application without any error? For example, when I attempt to flush the Producer as part of clean the application just quits: this makes it difficult to debug. – Przemek Lach Mar 20 '18 at 03:12
  • Some things to try: Set `ulimit -c unlimited` to make sure core files are generated. Check exit code of your program (`$?`). Run the program with valgrind or build with `-fsanitize=address`. Run your program from gdb. Do printf-debugging. – Edenhill Mar 20 '18 at 06:03
  • So I added a method for capturing all the signals the OS can throw at my program; taken from here: https://stackoverflow.com/questions/77005/how-to-automatically-generate-a-stacktrace-when-my-gcc-c-program-crashes. So now I'm actually getting a stack trace which is a bit more helpful. I'll update my original post with the details. – Przemek Lach Mar 21 '18 at 05:16
  • Can you provide your producer code that does flush()? Is it possible that your DeliveryReportCb has gone out of scope (stack allocated) by the time you call flush()? – Edenhill Mar 21 '18 at 06:49
  • 1
    Ya so my delivery logger and event logger were loosing scope. I refactored my code so that the loggers where scoped to the class and now it works fine. Thanks for the suggestion that led me down the right path and thanks for creating this awesome library in the first place. – Przemek Lach Mar 21 '18 at 16:08