1

I am new in socket programming and at this moment I am confronted with a problem that I can not solve. I have read from several sources that the C++ standard template (STL) containers are not thread-safe, so that one as a programmer has to impose a mechanism that ensures that several threads do not modify the data of a container concurrently.

For instance, Thread safety std::vector push_back and reserve

I have used the std::mutex class to make sure that nobody writes data in the same container at the same time when programming threads. However, this is not working for me when I use sockets.

Suppose I have 4 clients, each one sending data (int) to the server in the following order:

client_0: 4
client_1: 8
client_2: 5
client_4: 7

Observe the following code for a simple server:

#define PORT 60000

#include <unistd.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <vector>
#include <string>
#include <iostream>
#include <mutex>

using namespace std;

vector<int> inputQueue; //<--------!
mutex mtx; //<---------------------!

void printVector(vector<int> input) {
    cout << "inputQueue: [";
    for (unsigned int i = 0; i < input.size(); i++ ) {
        if (i != input.size() - 1)
            cout << input[i] << ", ";
        else
            cout << input[i];
    }
    cout << "]." << endl;
}

int main(int argc, char const *argv[])
{
    int server_fd, client_fd;
    struct sockaddr_in address;
    int opt = 1;
    int addrlen = sizeof(address);

    if ((server_fd = socket(AF_INET, SOCK_STREAM, 0)) == 0) {
        perror("socket failed");
        exit(EXIT_FAILURE);
    }
    if (setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt))) {
        perror("setsockopt");
        exit(EXIT_FAILURE);
    }
    address.sin_family = AF_INET;
    address.sin_addr.s_addr = INADDR_ANY;
    address.sin_port = htons( PORT );
    if (bind(server_fd, (struct sockaddr *)&address, sizeof(address))<0) {
        perror("bind failed");
        exit(EXIT_FAILURE);
    }
    if (listen(server_fd, 10) < 0) {
        perror("listen");
        exit(EXIT_FAILURE);
    }
    while(1) {
        char buffer[4];
        if ((client_fd = accept(server_fd, (struct sockaddr *)&address, (socklen_t*)&addrlen))<0) {
            perror("accept");
            exit(EXIT_FAILURE);
        }
        if (!fork()) {
            recv(client_fd, buffer, 4, MSG_WAITALL);
            int receivedInt = int(
                    (unsigned char)(buffer[0]) << 24 |
                    (unsigned char)(buffer[1]) << 16 |
                    (unsigned char)(buffer[2]) << 8 |
                    (unsigned char)(buffer[3])
            );
            mtx.lock(); //<-------------------------------------!
            inputQueue.push_back(receivedInt); //<--------------!
            cout << "Client context. Integer registered: " << receivedInt << ": inputQueue length is " << inputQueue.size() << endl;
            printVector(inputQueue); //<------------------------!
            mtx.unlock(); //<-----------------------------------!
            close(server_fd); close(client_fd);
        }
        cout << "Server context: inputQueue length is " << inputQueue.size() << endl;
        printVector(inputQueue);
    }
    return 0;
}

The server must receive data making sure that they do so in the same order and registering their respective data in a vector of integers, that is, std::vector<int> inputQueue, using the push_back() method, so that inputQueue = {4, 8, 5, 7} at the end of the reception of all the data by the clients.

I must clarify that inputQueue is a global variable, which when starting the execution of the server, does not contain elements, but they are added as the clients register.

The problem is that none of the client registers elements in inputQueue. Notice in the following code that, depending on where you put the cout << instruction, you can see that the inputQueue size is different. This shows that within the context of the client, each client overwrites the first element of inputQueue, but outside it none of the clients is able to register a single element in inputQueue.

Apparently, each socket has its own copy of inputQueue, so when it is destroyed, the modified copy of inputQueue is also destroyed.

Output is the following:

Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 4: inputQueue length is 1
inputQueue: [4].
Server context: inputQueue length is 1
inputQueue: [4].
Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 8: inputQueue length is 1
inputQueue: [8].
Server context: inputQueue length is 0
inputQueue: [].
Server context: inputQueue length is 1
inputQueue: [8].
Client context. Integer registered: 5: inputQueue length is 1
inputQueue: [5].
Server context: inputQueue length is 1
inputQueue: [5].
Server context: inputQueue length is 0
inputQueue: [].
Client context. Integer registered: 7: inputQueue length is 1
inputQueue: [7].
Server context: inputQueue length is 1
inputQueue: [7].

Does anyone have any idea why this happens and how could they solve it? I hope you can help me. Thank you

Michael Chourdakis
  • 10,345
  • 3
  • 42
  • 78
Vitrion
  • 405
  • 5
  • 14
  • 5
    `fork` creates a new process. Processes do not share memory space with each other. – Miles Budnek Jun 19 '19 at 22:12
  • @MilesBudnek, so what would you suggest? – Vitrion Jun 19 '19 at 22:15
  • 2
    Suggestion: See if you can avoid threads entirely here. You can synchronize multiple sockets with [`select`](http://man7.org/linux/man-pages/man2/select.2.html) and [`epoll`](http://man7.org/linux/man-pages/man7/epoll.7.html) Here is a simple select-based server: https://www.gnu.org/software/libc/manual/html_node/Server-Example.html – user4581301 Jun 19 '19 at 22:16
  • 2
    Instead of fork try using std::thread . Do not forget to either join on detach the threads see https://en.cppreference.com/w/cpp/thread/thread/thread – CuriouslyRecurringThoughts Jun 19 '19 at 22:18
  • Thank you very much guys. I'll try your suggestions right now and tell you later if I could solve this problem... – Vitrion Jun 19 '19 at 22:23
  • 3
    @Vitrion `mtx.lock(); stuff; mtx_unlock()` -- If `stuff` throws an exception, you have a permanently locked mutex. Use `lock_guard` or some other form of RAII. – PaulMcKenzie Jun 19 '19 at 22:36
  • 1
    Also, You can't combine multiple options in a single `setsockopt()` call. You need to call `setsockopt()` for `SO_REUSEADDR` and `SO_REUSEPORT` individually. – Remy Lebeau Jun 20 '19 at 00:39

2 Answers2

3
if (!fork()) {

fork() creates a completely new, independent process with its own virtual memory address space. The shown code, apparently, expects both the child process and the original process to be interacting through the same object, namely a vector, locked by a mutex.

That's not what happens. You now have two completely independent processes. This is no different than running your program twice, at the same time or in quck succession. Do you expect both running copies of your program to somehow share the same vector and mutex? Of course not.

What you are looking to do, instead, is to use std::thread to create a new execution thread in the same process. Your C++ book should have more information how to create new execution threads with std::thread.

Furthermore, even if you replace the fork() with an analogous execution thread: that will still not solve all the problems here. You will also need to correctly handle synchronization between multiple execution threads. Specifically: there are no guarantees whatsoever that a new execution thread will insert something into the vector, before the other execution thread attempts to printVector its contents. The new execution thread could manage to do that, before the original execution thread enters printVector. Or it may not, and printVector finds a completely empty vector, because the other execution thread hasn't managed to push something into it, quickly enough. You now have two completely independent execution threads running at the same time, and you have no guarantees as to which thread does what, first.

You can even get a different result every time you run the multithreaded version of the shown program (and you probably will).

When you are ready to begin tackling this new problem, your C++ book will explain how to use condition variables, together with mutexes, to correctly implement multi-threaded synchronization. Unfortunately, this is not a topic that can be completely covered in a brief answer on stackoverflow.com, but it should have several dedicated chapters in your C++ book, where you will find more information.

P.S. The only reason your output shows anything in the input queue is because there's nothing to stop the child process to continue executing the program when it exits its if statement, and ends up, itself, calling printVector. It's not coming from the parent process. Each child process ends up printing the value it itself inserted into its own vector.

Sam Varshavchik
  • 114,536
  • 5
  • 94
  • 148
2

As noted by Miles Budnek, you are creating a new child process. Sockets are global OS objects, so are working as expected. Your vector, and the memory it's stored in, is local to the process and therefore cannot be accessed by your new proc.

Consider looking into std::thread: https://en.cppreference.com/w/cpp/thread/thread

One of the most used methods of starting a thread is with a lambda.

#include <thread>
#include <iostream>

auto print_number(int number) -> void
{
    std::cout << number << std::endl; // This runs in the new thread.
}

int main()
{
    int num = 12;
    auto t = std::thread([num](){print_number(num);}); // Spawn new thread that calls the lambda

    t.join(); // Wait for thread to finish execution
    return 0;
}
Alex Hodges
  • 607
  • 1
  • 5
  • 9