TL;DR
I need a piece of code to multi-thread properly and not return or crash on me. What do I need to do to consume any return
or try/catch
blocks such that main()
does not end?
The story
I have been using cpp
with Google protobufs for a short while now and I have run into an issue that is not solved by my attempts of a solution. I started learning server-client socket programs with google protobufs and cpp
from this link. I first converted the .proto
file to proto3
by editing the .proto
file. I was then able to create a CMake file to compile the program.
My issue
I need to make a new client program such that, when the server.cpp
is canceled with Ctrl+c
, the client program does NOT crash, quit, seg fault, etc. Basically, when the server dies, the client should wait until the server comes back online.
What have I tried?
I have tried 2 things:
- I put the entire socket creation, including variable creation, into a massive while loop. I then had a delay at the beginning such that, if the connection could not be made, the program waits and then tries again.
- Multithreading with
boost
I have tried to use boost::thread
and boost::future
to create a boost::async
. I had a loop in main()
that would create a vector (of length 1) of boost::async()
objects with the entire rest of the program called in a method passed into the boost::async()
constructor. I then simply called the boost::wait_for_all()
on the iterator with begin()
and end()
. This is where I got the futures idea. Here is an example of the syntax:
while (true)
{
printingmutex.lock();
cout << "------------------\nStarting initialization of socket.\n" << endl;
printingmutex.unlock();
// Now I create the threads.
std::vector<boost::future<void>> futures;
futures.push_back(boost::async([host_name]{ doSocketswag(host_name); }));
boost::wait_for_all(futures.begin(), futures.end());
// Delay function after disconnection, implementation irrelevant to the question
sleepApp(2000); // 2 seconds
}
Just for brevity, here is the new .proto
file for conversion to proto3
:
syntax = "proto3";
message log_packet {
fixed64 log_time = 1;
fixed32 log_micro_sec = 2;
fixed32 sequence_no = 3;
fixed32 shm_app_id = 4;
string packet_id = 5;
string log_level= 6;
string log_msg= 7;
}
Why is this important?
This is important because, when I have this issue solved, I will be porting the client program to other platforms besides my Ubuntu box, but I will still be using my Ubuntu box, and I need to be able to run a program that operates outside of the sending of messages. I will also be able to add a GUI
to the program, so I will need to be able to split the socket thread from the rest of the program, and therefore prevent bad connections from killing the client program.
In conclusion
How do I make the client program multithreaded such that the socket communication does not
- kill the program
- return up through
main()
- kill only the socket thread, and nothing else?
A basic, no frills implementation that shows the sockets running separate from the main program will suffice.
Also, to speed up a response, I have a CMake file to compile the client
and server
and .proto
file with. I am sharing it so you guys can spend more time on actually answering the question than dinking around with compiling code.
# Compiling the program, assuming all of the source files are in the same directory:
# mkdir build
# cd build
# cmake ..
# cmake --build . -- -j8 -l8
set(PROJECT_NAME CppProtobufdemo)
cmake_minimum_required(VERSION 3.5 FATAL_ERROR)
project(${PROJECT_NAME} LANGUAGES CXX)
set(CMAKE_CXX_STANDARD 14)
set(CMAKE_CXX_STANDARD_REQUIRED ON)
add_definitions(-std=c++11)
# CMAKE_CURRENT_SOURCE_DIR - This is the directory where CMake is running.
set(CMAKE_BINARY_DIR ${CMAKE_SOURCE_DIR}/build)
set(EXECUTABLE_OUTPUT_PATH ${CMAKE_BINARY_DIR})
set(LIBRARY_OUTPUT_PATH ${CMAKE_BINARY_DIR}/lib)
find_package( Boost REQUIRED system thread timer chrono)
if(Boost_FOUND)
message("Boost was found.")
message(${Boost_INCLUDE_DIR})
endif()
include_directories(${CMAKE_CURRENT_BINARY_DIR})
set(THE_USER $ENV{USER})
message("This is the com user_:" ${THE_USER})
set(PROGRAMS_TO_COMPILE
server
client
)
# These lines are for autogenerating code from the *.proto files with CMake.
find_package(Protobuf REQUIRED)
if(PROTOBUF_FOUND)
message("Google Protobuf has been found.")
endif()
include_directories(${Protobuf_INCLUDE_DIRS})
include_directories(${CMAKE_CURRENT_BINARY_DIR})
# https://cmake.org/cmake/help/git-master/module/FindProtobuf.html
protobuf_generate_cpp(PROTO_SRCS PROTO_HDRS message.proto)
# I want to see if I can access these proto files from python scripts
protobuf_generate_python(PROTO_PY message.proto)
foreach(aprogram ${PROGRAMS_TO_COMPILE})
add_executable(${aprogram} ${PROTO_SRCS} ${PROTO_HDRS} ${PROJECT_SOURCE_DIR}/${aprogram}.cpp )
target_link_libraries(${aprogram} ${Boost_LIBRARIES})
target_link_libraries(${aprogram} ${PROTOBUF_LIBRARIES})
endforeach()
Another thing: I have always gotten rid of the goto FINISH
statements in the process.
If you have any comments or follow up questions, I am happy to respond.
Edit:
I have tried wrapping this line
boost::wait_for_all(futures.begin(), futures.end());
in a try catch like this:
try {
// Now that I have created all of the threads, I need to wait for them to finish.
boost::wait_for_all(futures.begin(), futures.end());
}
catch (const std::exception& e)
{
// Just keep swimming
cout << "Thread died" << endl;
}
And now the error messages to not get pushed up. Instead, the program just quits, without an error or any message at all.
I am using return
statements to kill the threaded method, but they should not be killing the main()
thread - unless there is something else I am missing. Here is the code for the client that I am using to demo this "fix":
// This is for the futures functionality
#define BOOST_THREAD_VERSION 4
//
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <errno.h>
#include <string.h>
#include <netdb.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <arpa/inet.h>
// cmake --build . -- -j8 -l8
#include "message.pb.h"
#include <iostream>
#include <google/protobuf/message.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
//
#include <boost/thread.hpp>
#include <chrono>
#include <thread>
using namespace google::protobuf::io;
using namespace std;
void doSocketSwag(void);
void sleepApp(int millis)
{
std::this_thread::sleep_for(std::chrono::milliseconds{millis});
}
// Boost sleep function
void wait(int milliseconds)
{
boost::this_thread::sleep_for(boost::chrono::milliseconds{milliseconds});
}
int main(int argv, char** argc)
{
// I will put this whole thing into a while loop
while (true)
{
cout << "Creating threads." << endl;
std::vector<boost::future<void>> futures;
// The socket thread
futures.push_back(boost::async([]{ doSocketSwag(); }));
// Now I need to catch exceptions
try {
// Now that I have created all of the threads, I need to wait for them to finish.
boost::wait_for_all(futures.begin(), futures.end());
}
catch (const std::exception& e)
{
// Just keep swimming
cout << "Thread died" << endl;
}
}
//delete pkt;
//FINISH:
//close(hsock);
return 0;
}
//
void doSocketSwag(void)
{
log_packet payload;
payload.set_log_time(10);
payload.set_log_micro_sec(10);
payload.set_sequence_no(1);
payload.set_shm_app_id(101);
payload.set_packet_id("TST");
payload.set_log_level("DEBUG");
payload.set_log_msg("What shall we say then");
//cout<<"size after serilizing is "<<payload.ByteSize()<<endl;
int siz = payload.ByteSize()+4;
char *pkt = new char [siz];
google::protobuf::io::ArrayOutputStream aos(pkt,siz);
CodedOutputStream *coded_output = new CodedOutputStream(&aos);
coded_output->WriteVarint32(payload.ByteSize());
payload.SerializeToCodedStream(coded_output);
int host_port= 1101;
char* host_name="127.0.0.1";
struct sockaddr_in my_addr;
char buffer[1024];
int bytecount;
int buffer_len=0;
int hsock;
int * p_int;
int err;
hsock = socket(AF_INET, SOCK_STREAM, 0);
if(hsock == -1){
printf("Error initializing socket %d\n",errno);
//goto FINISH;
return;
}
p_int = (int*)malloc(sizeof(int));
*p_int = 1;
if( (setsockopt(hsock, SOL_SOCKET, SO_REUSEADDR, (char*)p_int, sizeof(int)) == -1 ) || (setsockopt(hsock, SOL_SOCKET, SO_KEEPALIVE, (char*)p_int, sizeof(int)) == -1 ) ){
printf("Error setting options %d\n",errno);
free(p_int);
//goto FINISH;
return;
}
free(p_int);
my_addr.sin_family = AF_INET ;
my_addr.sin_port = htons(host_port);
memset(&(my_addr.sin_zero), 0, 8);
my_addr.sin_addr.s_addr = inet_addr(host_name);
if( connect( hsock, (struct sockaddr*)&my_addr, sizeof(my_addr)) == -1 ){
if((err = errno) != EINPROGRESS){
fprintf(stderr, "Error connecting socket %d\n", errno);
//goto FINISH;
return;
}
}
while (true)
{
if( (bytecount=send(hsock, (void *) pkt,siz,0))== -1 ) {
// THIS is where the program dies.
fprintf(stderr, "Error sending data %d\n", errno);
//goto FINISH;
break;
}
//printf("Sent bytes %d\n", bytecount);
//usleep(1);
sleepApp(1000);
cout << "Thread slept for 1 second." << endl;
}
}
If the return
statements are that bad, how can I replace them?