1

TL;DR

I need a piece of code to multi-thread properly and not return or crash on me. What do I need to do to consume any return or try/catch blocks such that main() does not end?

The story

I have been using cpp with Google protobufs for a short while now and I have run into an issue that is not solved by my attempts of a solution. I started learning server-client socket programs with google protobufs and cpp from this link. I first converted the .proto file to proto3 by editing the .proto file. I was then able to create a CMake file to compile the program.

My issue

I need to make a new client program such that, when the server.cpp is canceled with Ctrl+c, the client program does NOT crash, quit, seg fault, etc. Basically, when the server dies, the client should wait until the server comes back online.

What have I tried?

I have tried 2 things:

  1. I put the entire socket creation, including variable creation, into a massive while loop. I then had a delay at the beginning such that, if the connection could not be made, the program waits and then tries again.
  2. Multithreading with boost

I have tried to use boost::thread and boost::future to create a boost::async. I had a loop in main() that would create a vector (of length 1) of boost::async() objects with the entire rest of the program called in a method passed into the boost::async() constructor. I then simply called the boost::wait_for_all() on the iterator with begin() and end(). This is where I got the futures idea. Here is an example of the syntax:

while (true)
{
    printingmutex.lock();
    cout << "------------------\nStarting initialization of socket.\n" << endl;
    printingmutex.unlock();
    // Now I create the threads.
    std::vector<boost::future<void>> futures;
    futures.push_back(boost::async([host_name]{ doSocketswag(host_name); }));
    boost::wait_for_all(futures.begin(), futures.end());
    // Delay function after disconnection, implementation irrelevant to the question
    sleepApp(2000); // 2 seconds
}

Just for brevity, here is the new .proto file for conversion to proto3:

syntax = "proto3";
message log_packet {
  fixed64 log_time = 1;
  fixed32 log_micro_sec = 2;
  fixed32 sequence_no = 3;
  fixed32 shm_app_id = 4;
  string packet_id = 5;
  string log_level= 6;
  string log_msg= 7;
}

Why is this important?

This is important because, when I have this issue solved, I will be porting the client program to other platforms besides my Ubuntu box, but I will still be using my Ubuntu box, and I need to be able to run a program that operates outside of the sending of messages. I will also be able to add a GUI to the program, so I will need to be able to split the socket thread from the rest of the program, and therefore prevent bad connections from killing the client program.

In conclusion

How do I make the client program multithreaded such that the socket communication does not

  1. kill the program
  2. return up through main()
  3. kill only the socket thread, and nothing else?

A basic, no frills implementation that shows the sockets running separate from the main program will suffice.

Also, to speed up a response, I have a CMake file to compile the client and server and .proto file with. I am sharing it so you guys can spend more time on actually answering the question than dinking around with compiling code.

# Compiling the program, assuming all of the source files are in the same directory:
# mkdir build
# cd build
# cmake ..
# cmake --build . -- -j8 -l8

set(PROJECT_NAME CppProtobufdemo)
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
project(${PROJECT_NAME} LANGUAGES CXX)

set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)

add_definitions(-std=c++11)
# CMAKE_CURRENT_SOURCE_DIR - This is the directory where CMake is running.
set(CMAKE_BINARY_DIR ${CMAKE_SOURCE_DIR}/build)
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR})
set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib)

find_package( Boost REQUIRED system thread timer chrono)
if(Boost_FOUND)
    message("Boost was found.")
    message(${Boost_INCLUDE_DIR})
endif()

include_directories(${CMAKE_CURRENT_BINARY_DIR})

set(THE_USER $ENV{USER})
message("This is the com user_:" ${THE_USER})

set(PROGRAMS_TO_COMPILE
server
client
)

# These lines are for autogenerating code from the *.proto files with CMake.
find_package(Protobuf REQUIRED)
if(PROTOBUF_FOUND)
    message("Google Protobuf has been found.")
endif()
include_directories(${Protobuf_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# https://cmake.org/cmake/help/git-master/module/FindProtobuf.html
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS message.proto)
# I want to see if I can access these proto files from python scripts
protobuf_generate_python(PROTO_PY message.proto)

foreach(aprogram ${PROGRAMS_TO_COMPILE})
    add_executable(${aprogram} ${PROTO_SRCS} ${PROTO_HDRS} ${PROJECT_SOURCE_DIR}/${aprogram}.cpp )
    target_link_libraries(${aprogram} ${Boost_LIBRARIES})
    target_link_libraries(${aprogram} ${PROTOBUF_LIBRARIES})
endforeach()

Another thing: I have always gotten rid of the goto FINISH statements in the process.

If you have any comments or follow up questions, I am happy to respond.

Edit:

I have tried wrapping this line

boost::wait_for_all(futures.begin(), futures.end());

in a try catch like this:

try {
    // Now that I have created all of the threads, I need to wait for them to finish.
        boost::wait_for_all(futures.begin(), futures.end());
    }
    catch (const std::exception& e)
    {
    // Just keep swimming
    cout << "Thread died" << endl;
}

And now the error messages to not get pushed up. Instead, the program just quits, without an error or any message at all. I am using return statements to kill the threaded method, but they should not be killing the main() thread - unless there is something else I am missing. Here is the code for the client that I am using to demo this "fix":

// This is for the futures functionality
#define BOOST_THREAD_VERSION 4
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
// cmake --build . -- -j8 -l8
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
//
#include <boost/thread.hpp>
#include <chrono>
#include <thread>

using namespace google::protobuf::io;
using namespace std;

void doSocketSwag(void);

void sleepApp(int millis)
{
    std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}

// Boost sleep function
void wait(int milliseconds)
{
    boost::this_thread::sleep_for(boost::chrono::milliseconds{milliseconds});
}

int main(int argv, char** argc)
{
    // I will put this whole thing into a while loop
    while (true)
    {
        cout << "Creating threads." << endl;
        std::vector<boost::future<void>> futures;
        // The socket thread
        futures.push_back(boost::async([]{ doSocketSwag(); }));
        // Now I need to catch exceptions
        try {
            // Now that I have created all of the threads, I need to wait for them to finish.
            boost::wait_for_all(futures.begin(), futures.end());
        }
        catch (const std::exception& e)
        {
            // Just keep swimming
            cout << "Thread died" << endl;
        }
    }
    //delete pkt;
    //FINISH:
    //close(hsock);
    return 0;
}

//
void doSocketSwag(void)
{
    log_packet payload;
    payload.set_log_time(10);
    payload.set_log_micro_sec(10);
    payload.set_sequence_no(1);
    payload.set_shm_app_id(101);
    payload.set_packet_id("TST");
    payload.set_log_level("DEBUG");
    payload.set_log_msg("What shall we say then");
    //cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
    int siz = payload.ByteSize()+4;
    char *pkt = new char [siz];
    google::protobuf::io::ArrayOutputStream aos(pkt,siz);
    CodedOutputStream *coded_output = new CodedOutputStream(&aos);
    coded_output->WriteVarint32(payload.ByteSize());
    payload.SerializeToCodedStream(coded_output);

    int host_port= 1101;
    char* host_name="127.0.0.1";

    struct sockaddr_in my_addr;

    char buffer[1024];
    int bytecount;
    int buffer_len=0;

    int hsock;
    int * p_int;
    int err;

    hsock = socket(AF_INET, SOCK_STREAM, 0);
    if(hsock == -1){
        printf("Error initializing socket %d\n",errno);
        //goto FINISH;
        return;
    }

    p_int = (int*)malloc(sizeof(int));
    *p_int = 1;

    if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 ) || (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
        printf("Error setting options %d\n",errno);
        free(p_int);
        //goto FINISH;
        return;
    }
    free(p_int);

    my_addr.sin_family = AF_INET ;
    my_addr.sin_port = htons(host_port);

    memset(&(my_addr.sin_zero), 0, 8);
    my_addr.sin_addr.s_addr = inet_addr(host_name);
    if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
        if((err = errno) != EINPROGRESS){
            fprintf(stderr, "Error connecting socket %d\n", errno);
            //goto FINISH;
            return;
        }
    }

    while (true)
    {
        if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
            // THIS is where the program dies.
            fprintf(stderr, "Error sending data %d\n", errno);
            //goto FINISH;
            break;
        }
        //printf("Sent bytes %d\n", bytecount);
        //usleep(1);
        sleepApp(1000);
        cout << "Thread slept for 1 second." << endl;
    }
}

If the return statements are that bad, how can I replace them?

Community
  • 1
  • 1
robotsfoundme
  • 418
  • 4
  • 18
  • It's unclear to me why would the client crash if the server goes offline? Does `doSocketswag` throw if the socket dies? If so, can't you catch it? – kabanus Jul 14 '19 at 18:03
  • The client is in this code block: `if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) { fprintf(stderr, "Error sending data %d\n", errno); goto FINISH; }` and it tells me that it errors out, gives me an error number, and then quits the program. I should also say that the above pasted code block is in a while(true) loop, but that does not change the general logic. Also, in my personal code, I have a lot of other logic going on, but the original SO post linked above provides the same idea for proof of concept. I should also mention that I got rid of the `goto FINISH` in my original program. – robotsfoundme Jul 14 '19 at 21:46
  • I know **where** the code is failing. I **do not** know _what_ to do to fix it. A debugger is not going to help me figure out _what_ to _replace_ code with @MaxVollmer. – robotsfoundme Jul 15 '19 at 00:07
  • Maybe I am too tired, but even after rereading your question I don't see any information on where the code is failing. Can you either add it or make it more clear? – Max Vollmer Jul 15 '19 at 00:18
  • Probably a combination of all of the above... I have been there too. – robotsfoundme Jul 15 '19 at 00:23
  • I am using Cmake to build the code on a Ubuntu 1604 OS. I am compiling my program through the top 5 lines provided in the CMake file (as comments). I am running the code on the terminal with `./server` and `./client`. I am testing the program by starting up both the client and the server and then killing the server with `Ctrl+c`. If the client stays alive, my problem is solved. If the client dies, then I am still working on a solution. – robotsfoundme Jul 15 '19 at 00:50
  • The issue is when the `boost::wait_for_all(futures.begin(), futures.end());` call is ended because the client cannot send data with `(bytecount=send(hsock, (void *) pkt,siz,0))== -1 )`. It returns `-1` and then the socket is terminated. Somehow, I need a way to, instead of destroying everything, maybe just close the socket and somehow handle logic so that I can restart the socket thread in the `while()` loop in the thread vector and go merrily on my way. – robotsfoundme Jul 15 '19 at 00:52
  • I suspect you might be running into a SIGPIPE signal, which by default is raised when a TCP connection is closed while your program is trying to send data to it. To avoid a call to `abort()` when that happens, you can either handle the SIGPIPE signal in a signal-handler, or better yet, tell the OS you want to ignore the SIGPIPE signal entirely so that you can just handle `send()` returning -1 instead. See: https://stackoverflow.com/questions/108183/how-to-prevent-sigpipes-or-handle-them-properly – Jeremy Friesner Jul 15 '19 at 01:41
  • I think @JeremyFriesner has the right of it - try writing a quick signal handler that just prints your in it to know for sure, and then you can decide on the best way forward. – kabanus Jul 15 '19 at 04:35
  • The last 2 comments seem the most related to my problem. It must be a signal interrupt of sorts because no sort of loop control logic that I implement seems to address the issue. I will research more into `SIGPIPE` and its interrupting the signal. I guess I do not know enough about sockets to know to look something like that up before going to SO for help. – robotsfoundme Jul 15 '19 at 13:44
  • Hey @MaxVollmer can you remove the duplicate question thing? That is no longer necessary. – robotsfoundme Jul 26 '19 at 18:08

1 Answers1

0

Here we go guys

After reviewing the link provided by Jeremy Friesner, I discovered that this issue is most definitely not original. It turns out that sockets are able to publish signals to interrupt a cpp program to require handling by the user. The link provided explains how that works. Basically, I simply had to handle an interrupt condition in order for the program to continue: the logic in the program was correct, but I did not know enough about sockets to know to ask about SIGPIPE and SIG_IGN to solve my problem. I have displayed below 2 examples of how to solve the problem, similar to Jeremy Friesner‘s link. The multi-threading also works properly now, and it is super awesome to watch.

Here is the example where I directly copy the solution from the link:

// This is for the futures functionality
//#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_FUTURE
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
// cmake --build . -- -j8 -l8
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
//
#include <boost/thread.hpp>
#include <chrono>
#include <thread>
#include <signal.h>

using namespace google::protobuf::io;
using namespace std;

void doSocketSwag(void);

void sleepApp(int millis)
{
    std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}

// Boost sleep function
void wait(int milliseconds)
{
    boost::this_thread::sleep_for(boost::chrono::milliseconds{milliseconds});
}

log_packet payload;
int siz;
char *pkt;
google::protobuf::io::ArrayOutputStream * aos;
CodedOutputStream *coded_output;

int host_port= 1101;
char* host_name="127.0.0.1";

struct sockaddr_in my_addr;

char buffer[1024];
int bytecount;
int buffer_len=0;

int hsock;
int * p_int;
int err;

// This boolean controls whether of not the socket thread loops.
std::atomic_bool dosockets{true};

// This is the mutex for printing
std::mutex printlock;

// Try to do client socket program without server.
int main(int argv, char** argc)
{
    // Registering the singal PIPE ignore
    signal(SIGPIPE, SIG_IGN);

    // We build the packet.
    payload.set_log_time(10);
    payload.set_log_micro_sec(10);
    payload.set_sequence_no(1);
    payload.set_shm_app_id(101);
    payload.set_packet_id("TST");
    payload.set_log_level("DEBUG");
    payload.set_log_msg("What shall we say then");

    // Now I fill in the socket fields
    siz = payload.ByteSize()+4;
    pkt = new char [siz];
    aos = new google::protobuf::io::ArrayOutputStream(pkt,siz);
    coded_output = new CodedOutputStream(aos);

    // Now we do methods on the objects
    coded_output->WriteVarint32(payload.ByteSize());
    payload.SerializeToCodedStream(coded_output);

    // I will put this whole thing into a while loop
    while (true)
    {
        printlock.lock();
        cout << "Creating threads." << endl;
        printlock.unlock();
        std::vector<boost::future<void>> futures;
        // The socket thread
        futures.push_back(boost::async([]{ doSocketSwag(); }));
        boost::wait_for_all(futures.begin(), futures.end());
    }
    //delete pkt;
    //FINISH:
    //close(hsock);
    return 0;
}

//
void doSocketSwag(void)
{
    // 
    while (dosockets)
    {
        printlock.lock();
        cout << "Waiting to try again." << endl;
        printlock.unlock();
        sleepApp(1000);

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
            printlock.lock();
            printf("Error initializing socket %d\n",errno);
            printlock.unlock();
            //goto FINISH;
            continue;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 ) || (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
            printlock.lock();
            printf("Error setting options %d\n",errno);
            printlock.unlock();
            free(p_int);
            //goto FINISH;
            continue;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = inet_addr(host_name);
        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
            if((err = errno) != EINPROGRESS)
            {
                printlock.lock();
                fprintf(stderr, "Error connecting socket %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                continue;
            }
        }

        while (true)
        {
            if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 )
            {
                printlock.lock();
                fprintf(stderr, "Error sending data %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                //break;
                close(hsock);
                break;
            }
            //printf("Sent bytes %d\n", bytecount);
            //usleep(1);
            sleepApp(1000);
            printlock.lock();
            cout << "Thread slept for 1 second." << endl;
            printlock.unlock();
        }
    }
}

And then here is the example where I create a callback function when the SIGPIPE signal gets raised:

// This is for the futures functionality
//#define BOOST_THREAD_VERSION 4
#define BOOST_THREAD_PROVIDES_FUTURE
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
// cmake --build . -- -j8 -l8
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
//
#include <boost/thread.hpp>
#include <chrono>
#include <thread>
#include <signal.h>

using namespace google::protobuf::io;
using namespace std;

void doSocketSwag(void);

void sleepApp(int millis)
{
    std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}

// Boost sleep function
void wait(int milliseconds)
{
    boost::this_thread::sleep_for(boost::chrono::milliseconds{milliseconds});
}

log_packet payload;
int siz;
char *pkt;
google::protobuf::io::ArrayOutputStream * aos;
CodedOutputStream *coded_output;

int host_port= 1101;
char* host_name="127.0.0.1";

struct sockaddr_in my_addr;

char buffer[1024];
int bytecount;
int buffer_len=0;

int hsock;
int * p_int;
int err;

// This boolean controls whether of not the socket thread loops.
std::atomic_bool dosockets{true};

// This is the mutex for printing
std::mutex printlock;

// The signal pipe error
void my_handler(int input)
{
    // Simply print
    printlock.lock();
    cout << "I got into the pipe handler LOOK AT ME WOOO HOOO." << endl;
    printlock.unlock();
}

// Try to do client socket program without server.
int main(int argv, char** argc)
{
    // Registering the singal PIPE ignore
    struct sigaction sigIntHandler;
    sigIntHandler.sa_handler = my_handler;
    sigemptyset(&sigIntHandler.sa_mask);
    sigIntHandler.sa_flags = 0;
    sigaction(SIGPIPE, &sigIntHandler, NULL);

    // We build the packet.
    payload.set_log_time(10);
    payload.set_log_micro_sec(10);
    payload.set_sequence_no(1);
    payload.set_shm_app_id(101);
    payload.set_packet_id("TST");
    payload.set_log_level("DEBUG");
    payload.set_log_msg("What shall we say then");

    // Now I fill in the socket fields
    siz = payload.ByteSize()+4;
    pkt = new char [siz];
    aos = new google::protobuf::io::ArrayOutputStream(pkt,siz);
    coded_output = new CodedOutputStream(aos);

    // Now we do methods on the objects
    coded_output->WriteVarint32(payload.ByteSize());
    payload.SerializeToCodedStream(coded_output);

    // I will put this whole thing into a while loop
    while (true)
    {
        printlock.lock();
        cout << "Creating threads." << endl;
        printlock.unlock();
        std::vector<boost::future<void>> futures;
        // The socket thread
        futures.push_back(boost::async([]{ doSocketSwag(); }));
        boost::wait_for_all(futures.begin(), futures.end());
    }
    //delete pkt;
    //FINISH:
    //close(hsock);
    return 0;
}

//
void doSocketSwag(void)
{
    // 
    while (dosockets)
    {
        printlock.lock();
        cout << "Waiting to try again." << endl;
        printlock.unlock();
        sleepApp(1000);

        hsock = socket(AF_INET, SOCK_STREAM, 0);
        if(hsock == -1){
            printlock.lock();
            printf("Error initializing socket %d\n",errno);
            printlock.unlock();
            //goto FINISH;
            continue;
        }

        p_int = (int*)malloc(sizeof(int));
        *p_int = 1;

        if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 ) || (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
            printlock.lock();
            printf("Error setting options %d\n",errno);
            printlock.unlock();
            free(p_int);
            //goto FINISH;
            continue;
        }
        free(p_int);

        my_addr.sin_family = AF_INET ;
        my_addr.sin_port = htons(host_port);

        memset(&(my_addr.sin_zero), 0, 8);
        my_addr.sin_addr.s_addr = inet_addr(host_name);
        if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
            if((err = errno) != EINPROGRESS)
            {
                printlock.lock();
                fprintf(stderr, "Error connecting socket %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                continue;
            }
        }

        while (true)
        {
            if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 )
            {
                printlock.lock();
                fprintf(stderr, "Error sending data %d\n", errno);
                printlock.unlock();
                //goto FINISH;
                //break;
                close(hsock);
                break;
            }
            //printf("Sent bytes %d\n", bytecount);
            //usleep(1);
            sleepApp(1000);
            printlock.lock();
            cout << "Thread slept for 1 second." << endl;
            printlock.unlock();
        }
    }
}

The server code remains unchanged from the original link provided in the beginning of the question.

Thank you guys for the comments. They directed me to my answer and a whole new area of cpp I did not know what a thing: signals.

(If you guys want to continue commenting, that is fine as well. I am considering this question as answered.)

robotsfoundme
  • 418
  • 4
  • 18