0

We have a project in which we've the server side in JAVA and client side in C. For the socket communication we are using Netty on the server side. Once the server is up, the client sends a file name through a message packet to the server. On receiving the packet the server channelRead calls a method of another class FileBufferReader, in which it reads the file 1024 bytes at a time and puts it in a shared memory and sends a packet to the client which consist of number of bytes read. For executing the shared memory code I am using JNA. Now, on receiving the request from server the client should read that from the shared memory and write it into a file. Now, I want to synchronize the server message about the content to be read from the shared memory with a wait and issue a notify on receiving a message from the client. But when I put a wait in the file read loop, no subsequent message is received in the channelRead?

NettyServer.java

public class NettyServer {

    private int port;
    private Channel channel;
        
    public NettyServer(int port) {
        this.port = port;
    }

    public static void main(String[] args) throws Exception {
        int port;
        if (args.length > 0) {
            port = Integer.parseInt(args[0]);
        } else {
            port = 8080;
        }
        new NettyServer(port).run();
    }

    public void run() throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup();
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler());
                }
            }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);

            ChannelFuture f = b.bind(port).sync();
            f.channel().closeFuture().sync();

            channel = f.channel();
        } finally {
            workerGroup.shutdownGracefully();
            bossGroup.shutdownGracefully();
        }
    }       
}

ProcessingHandler.java

public class ProcessingHandler extends ChannelInboundHandlerAdapter {
    public static final Object responseLock = new Object();

     @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // The channel is active and connected to a client
        System.out.println("Connected by client");
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

        if (msg instanceof RequestData) {
            RequestData request = (RequestData) msg;
            System.out.println(request);
            processRequest(request, ctx);
            synchronized (responseLock) {
                responseLock.notify();
            }
        } else {
        }
    }   

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("Disconnected by client");
    }

    private void processRequest(RequestData request, ChannelHandlerContext ctx) {
        char msgType = 'J';

        // Create a corresponding response
        if (request.getStringValue().equals("RUN")) {
            ResponseReadShm responseData = new ResponseReadShm();
            responseData.setMsgType(msgType);
            String str = "READ";
            responseData.setIntValue(str.length());
            responseData.setMsg(str);
            FileBufferReader f = new FileBufferReader();
            String filename = request.getFilename();
            f.readFile(filename, ctx, responseLock);
        } else if (request.getStringValue().equals("FREE")) {
        } else {
        }
    }
}

FileBufferReader.java

public class FileBufferReader {
    private static final int K = 1024; // Chunk size in bytes
    private static final int CHUNK_SIZE = 1*K; // Chunk size in Kb

    public void readFile(String filePath, ChannelHandlerContext ctx, Object responseLock) {
        try (FileInputStream fis = new FileInputStream(filePath)) { 
            byte[] buffer = new byte[CHUNK_SIZE]; 
            int bytesRead;

            SharedMemory shm = new SharedMemory();
            long offset = 0;
            while ((bytesRead = fis.read(buffer)) != -1) {
                // Process the chunk of data here
                offset += bytesRead;
                shm.writeToSharedMemory(buffer, bytesRead, offset);
                
                ResponseReadShm responseData = new ResponseReadShm();
                responseData.setMsgType('J');
                String str = "FREAD";
                responseData.setIntValue(str.length());
                responseData.setMsg(str);
                responseData.setBytes(bytesRead);

                ctx.writeAndFlush(responseData);
        
                synchronized (responseLock) {
                         responseLock.wait();
                }
            }

            ResponseReadShm responseData = new ResponseReadShm();
            responseData.setMsgType('J');
            String str = "FREAD_END";
            responseData.setIntValue(str.length());
            responseData.setMsg(str);
            responseData.setBytes(-1);
            ctx.writeAndFlush(responseData);

            shm.cleanup();
            fis.close();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

client.c

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>

typedef struct {
    char charValue;
    int intValue;
    char stringValue[256];
    int filename_strlen;
    char filename[128];
} RequestData;

typedef struct {
    char msgType;
    int intValue;
    char msg[256];
    int bytesRead;
} ResponseData;

void sendRequestData(int socket, const RequestData* data) {
    // Convert the request data to a binary format
    char buffer[512];
    char charValue = data->charValue;
    int intValue = htonl(data->intValue);
    int strLen = strlen(data->stringValue);
    int netStrLen = htonl(strLen);

    memcpy(buffer, &charValue, sizeof(char)); // Copy the char value
    memcpy(buffer + sizeof(char), &intValue, sizeof(int)); // int val
    memcpy(buffer + sizeof(char) + sizeof(int), &netStrLen, sizeof(int)); // sizeof str
    memcpy(buffer + sizeof(char) + 2 * sizeof(int), data->stringValue, strLen); // str

    int strLen2 = strlen(data->filename);
    int netStrLen2 = htonl(strLen2);
    memcpy(buffer + sizeof(char) + 2 * sizeof(int) + strLen, &netStrLen2, sizeof(int)); // str
    memcpy(buffer + sizeof(char) + 2 * sizeof(int) + strLen + sizeof(int), data->filename, strLen2); // str

    // Send the binary data to the server
    send(socket, buffer, sizeof(char) + 3 * sizeof(int) + strLen + strLen2, 0);
}

void readResponseData(int socket, ResponseData* data) {
    // Read the response data from the server
    recv(socket, &(data->msgType), sizeof(char), 0);

    recv(socket, &(data->intValue), sizeof(int), 0);
    data->intValue = ntohl(data->intValue);

    int strLen;
    /*recv(socket, &strLen, sizeof(int), 0);*/
    strLen = data->intValue;

    recv(socket, data->msg, strLen, 0);
    data->msg[strLen] = '\0';  // Null-terminate the string

    /*char buf[10];
    recv(socket, buf, sizeof(char), 0);*/
    recv(socket, &(data->bytesRead), sizeof(int), 0);
    data->bytesRead = ntohl(data->bytesRead);
}

// Shared memory functions
int createSharedMemory(const char* name, int size) {
    key_t key = ftok(name, 1);
    int shmid = shmget(key, size, IPC_CREAT | 0666);
    
    if (shmid == -1) {
        perror("Failed to create shared memory");
        return -1;
    }
    
    return shmid;
}

void* attachSharedMemory(const char* name, int bytesRead) {
    key_t key = ftok(name, 1);
    int shmid = shmget(key, bytesRead, 0);
    
    if (shmid == -1) {
        printf("%s", name);
        perror("Failed to get shared memory ID");
        return NULL;
    }
    
    void* shmaddr = shmat(shmid, NULL, 0);
    
    if (shmaddr == (void*)-1) {
        perror("Failed to attach shared memory");
        return NULL;
    }
    
    return shmaddr;
}

int detachSharedMemory(void* shmaddr) {
    if (shmdt(shmaddr) == -1) {
        perror("Failed to detach shared memory");
        return -1;
    }
    
    return 0;
}

int destroy(const char* name) {
    // destroy the shared memory
    key_t key = ftok(name, 1);
    int shmid = shmget(key, 0, 0);
    
    if (shmid == -1) {
        printf("%s", name);
        perror("Failed to destroy shared memory ID");
        return -1;
    }

    shmctl(shmid, IPC_RMID, NULL);
    return 0;
}

void readFromSharedMemory(int bytesRead) {
    char *shmaddr = attachSharedMemory("shm", bytesRead);   
    if (shmaddr == NULL) {
        exit(1);
    }

    FILE* file = fopen("shared_memory_dump.txt", "a+");  // Open the file for writing
    if (file == NULL) {
        perror("fopen");
        exit(1);
    }

    // Read data from shared memory and write it to the file
    /*printf("strlen %d ----byteread: %d\n", strlen(shmaddr), bytesRead);
    if (strlen(shmaddr) < 1024 )
        shmaddr[strlen(shmaddr)+1] = '\0';*/

    fwrite(shmaddr, bytesRead, 1, file);
    fclose(file);  // Close the file

    //destroy("shm");
}

void file_terminate(int bytesRead) {
    FILE* file = fopen("shared_memory_dump.txt", "w");  // Open the file for writing
    if (file == NULL) {
        perror("fopen");
        exit(1);
    }

    char null[1] = "\0"; 
    // Read data from shared memory and write it to the file
    fwrite(null, 1, 1, file);
}

int main() {
    int socketfd;
    struct sockaddr_in serverAddr;
    char serverIP[] = "127.0.0.1";
    int serverPort = 8080;

    // Create socket
    socketfd = socket(AF_INET, SOCK_STREAM, 0);
    if (socketfd < 0) {
        perror("Failed to create socket");
        return -1;
    }

    // Set up server details
    serverAddr.sin_family = AF_INET;
    serverAddr.sin_port = htons(serverPort);
    if (inet_pton(AF_INET, serverIP, &(serverAddr.sin_addr)) <= 0) {
        perror("Failed to set up server address");
        return -1;
    }

    // Connect to the server
    if (connect(socketfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
        perror("Failed to connect to the server");
        return -1;
    }

    // Prepare request data

    RequestData requestData;
    requestData.charValue = 'C';
    requestData.intValue = 432;
    strcpy(requestData.stringValue, "RUN");

    printf("Enter filename: ");
    scanf("%s", requestData.filename);

    // Send request data to the server
    sendRequestData(socketfd, &requestData);

    int offset = 0;

    while (1) {
        ResponseData responseData2;
        readResponseData(socketfd, &responseData2);
        offset += responseData2.bytesRead;
        if (!strcmp(responseData2.msg, "FREAD")) {
            printf("Received response: {%c %s %d}\n", responseData2.msgType, responseData2.msg, responseData2.bytesRead);
            readFromSharedMemory(responseData2.bytesRead);
            RequestData requestData;
            requestData.charValue = 'C';
            requestData.intValue = 432;
            strcpy(requestData.stringValue, "FREE");
            sendRequestData(socketfd, &requestData);
        } else if (!strcmp(responseData2.msg, "FREAD_END")) {
            printf("Received response2: {%c %s %d}\n", responseData2.msgType, responseData2.msg, responseData2.bytesRead);
        } 
    }

    // Close the socket
    close(socketfd);

    return 0;
}

What is wrong in the wait? Are we doing something wrong by calling the wait? Need some idea how to do it? When the wait is not there it's working but the write from the shared memory into a file in the client side is going wrong. So, I thought of it should be synchronized and while loop for file should not proceed to write to SHM unless a response comes from the client side which will notify.

devd
  • 370
  • 10
  • 28

0 Answers0