0

Basically what I want to do is to trade messages between multithreaded clients to a single thread coordinator using sockets in C++. The clients will ask for a grant permission to access a file and the coordinator will put them in a queue, granting one at a time. I think that's not the best solution, but I thought about creating a socket in port 2000 for the coordinator so that the clients send all the messages there. Each client will create a socket with his identifier (port 2001, 2002, 2003...) so that the coordinator can answer them trough these ports whenever he needs to. The issue I'm having is that the the clients are stuck on the accept() function of their own socket, even the first one, who was granted by the coordinator. Here's my code below (the cout "I'm here" is never shown in the client, that's where it's stuck - line 53):

coordinator.cpp:

#include <iostream>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>

using namespace std;

int main (int argc, char** argv) {

    const int REQUEST = 1;
    const int GRANT = 2;
    const int RELEASE = 3;
    const char* portno = "2000";
    int queue[128] = {0};
    int last = 0;

    int socket_receive, newsocket_receive, socket_send, port_send;
    string msg, msg_send;

    char buffer[256];
    memset(buffer, '|', 256);
    struct addrinfo hints, *res;
    struct addrinfo hints_send, *res_send;
    struct sockaddr_in client_address;

    bzero((char*) &hints, sizeof(hints));

    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;

    getaddrinfo(NULL,portno,&hints,&res);
    socket_receive = socket(res->ai_family,res->ai_socktype,res->ai_protocol);
    bind(socket_receive, res->ai_addr, res->ai_addrlen);
    listen(socket_receive,5);
    socklen_t client_lenght = sizeof(client_address);

    while (1){
        newsocket_receive = accept(socket_receive,(struct sockaddr*)&client_address,&client_lenght);

        int n = recv(newsocket_receive,buffer,sizeof(buffer),0);
        msg.append(buffer, buffer+n);

        copy(buffer+n, buffer+256, buffer);
        char subarray[n];
        memset(subarray, '|', n);
        copy(subarray, subarray+n, buffer+256-n);

        int elems[2];
        int i = 0;
        while (i < sizeof(msg)-1){
            if (msg[i] == ':'){
                break;
            }
            i++;
        }
        if (i < 4){
            const char* char_msg0 = msg.substr(0,i).c_str();
            const char* char_msg1 = msg.substr(i+1,sizeof(msg)).c_str();
            elems[0] = atoi(char_msg0);
            elems[1] = atoi(char_msg1);

            if (elems[1] == REQUEST){
                if (last == 0){

                    port_send = 2000 + elems[0];
                    string str_port_send = to_string(port_send);
                    char const* char_port_send = str_port_send.c_str();

                    getaddrinfo("localhost",char_port_send,&hints_send,&res_send);
                    socket_send = socket(res_send->ai_family,res_send->ai_socktype,res_send->ai_protocol);
                    connect(socket_send,res_send->ai_addr,res_send->ai_addrlen);
                    msg_send = to_string(GRANT);
                    cout << "Coordinator: sending GRANT message to consumer " << elems[0] << endl;
                    send(socket_send, msg_send.data(),msg_send.size(),0);
                    close(socket_send);

                    queue[last] = elems[0];
                    last++;
                }
                else {
                    queue[last] = elems[0];
                    last++;
                }
            }
            else if (elems[1] == RELEASE){
                //SHIFT QUEUE
                last--;
            }
            msg = "";
        }
     }

    close(socket_receive);

    return 0;
}

client.cpp:

#include <iostream>
#include <pthread.h>
#include <unistd.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netdb.h>
#include <algorithm>

using namespace std;

void *request (void *arg){

    const int REQUEST = 1;
    const int GRANT = 2;
    const int RELEASE = 3;

    int id = *((int *) arg);

    string msg_send, msg_receive;
    const char* portno = "2000";
    const char* port_receive = to_string(2000+id).c_str();
    int socket_send, socket_receive, newsocket_receive;
    char buffer [256];
    struct addrinfo hints, *res;
    struct addrinfo hints_receive, *res_receive;
    struct sockaddr_in client_address;


    bzero((char*) &hints, sizeof(hints));

    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_flags = AI_PASSIVE;

    getaddrinfo("localhost",portno,&hints,&res);
    socket_send = socket(res->ai_family,res->ai_socktype,res->ai_protocol);
    connect(socket_send,res->ai_addr,res->ai_addrlen);
    msg_send = to_string(id) + ":" + to_string(REQUEST);
    cout << "Client: sending REQUEST message to coordinator." << endl;
    send(socket_send,msg_send.data(),msg_send.size(),0);
    close(socket_send);

    getaddrinfo(NULL,port_receive,&hints_receive,&res_receive);
    socket_receive = socket(res_receive->ai_family,res_receive->ai_socktype,res_receive->ai_protocol);
    bind(socket_receive, res_receive->ai_addr, res_receive->ai_addrlen);
    listen(socket_receive,5);
    socklen_t client_lenght = sizeof(client_address);

    newsocket_receive = accept(socket_receive,(struct sockaddr*)&client_address,&client_lenght);
    cout << "I'm here" << endl;
    int n = recv(newsocket_receive,buffer,sizeof(buffer),0);
    msg_receive.append(buffer, buffer+n);
    cout << msg_receive << endl;
    copy(buffer+n, buffer+256, buffer);
    char subarray[n];
    memset(subarray, '|', n);
    copy(subarray, subarray+n, buffer+256-n);
    close(socket_receive);

}

int main (int argc, char** argv) {

    int n_threads = atoi(argv[1]);
    pthread_t threads[n_threads];

    for (long i=1; i<=n_threads; i++){
        int *arg = (int *) malloc(sizeof(*arg));
        *arg = i;
        pthread_create(&threads[i], NULL, request, arg);
    }

    pthread_exit(NULL);
    return 0;
}

1 Answers1

0

You are spending a lot of time making multiple connections between the server and the clients. I would try and re-use the first socket connection that the client makes with connect(), since the socket is bi-directional.

As far as you connection mechanics, the first problem I see is that you are calling listen() inside the loop. You should only call it once for the server.

woolstar
  • 5,063
  • 20
  • 31