We have a project in which we've the server side in JAVA and client side in C. For the socket communication we are using Netty on the server side. Once the server is up, the client sends a file name through a message packet to the server. On receiving the packet the server channelRead
calls a method of another class FileBufferReader, in which it reads the file 1024 bytes at a time and puts it in a shared memory and sends a packet to the client which consist of number of bytes read. For executing the shared memory code I am using JNA. Now, on receiving the request from server the client should read that from the shared memory and write it into a file.
Now, I want to synchronize the server message about the content to be read from the shared memory with a wait and issue a notify on receiving a message from the client.
But when I put a wait in the file read loop, no subsequent message is received in the channelRead?
NettyServer.java
public class NettyServer {
private int port;
private Channel channel;
public NettyServer(int port) {
this.port = port;
}
public static void main(String[] args) throws Exception {
int port;
if (args.length > 0) {
port = Integer.parseInt(args[0]);
} else {
port = 8080;
}
new NettyServer(port).run();
}
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup();
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestDecoder(), new ResponseDataEncoder(), new ProcessingHandler());
}
}).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture f = b.bind(port).sync();
f.channel().closeFuture().sync();
channel = f.channel();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}
}
ProcessingHandler.java
public class ProcessingHandler extends ChannelInboundHandlerAdapter {
public static final Object responseLock = new Object();
@Override
public void channelActive(ChannelHandlerContext ctx) {
// The channel is active and connected to a client
System.out.println("Connected by client");
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RequestData) {
RequestData request = (RequestData) msg;
System.out.println(request);
processRequest(request, ctx);
synchronized (responseLock) {
responseLock.notify();
}
} else {
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
System.out.println("Disconnected by client");
}
private void processRequest(RequestData request, ChannelHandlerContext ctx) {
char msgType = 'J';
// Create a corresponding response
if (request.getStringValue().equals("RUN")) {
ResponseReadShm responseData = new ResponseReadShm();
responseData.setMsgType(msgType);
String str = "READ";
responseData.setIntValue(str.length());
responseData.setMsg(str);
FileBufferReader f = new FileBufferReader();
String filename = request.getFilename();
f.readFile(filename, ctx, responseLock);
} else if (request.getStringValue().equals("FREE")) {
} else {
}
}
}
FileBufferReader.java
public class FileBufferReader {
private static final int K = 1024; // Chunk size in bytes
private static final int CHUNK_SIZE = 1*K; // Chunk size in Kb
public void readFile(String filePath, ChannelHandlerContext ctx, Object responseLock) {
try (FileInputStream fis = new FileInputStream(filePath)) {
byte[] buffer = new byte[CHUNK_SIZE];
int bytesRead;
SharedMemory shm = new SharedMemory();
long offset = 0;
while ((bytesRead = fis.read(buffer)) != -1) {
// Process the chunk of data here
offset += bytesRead;
shm.writeToSharedMemory(buffer, bytesRead, offset);
ResponseReadShm responseData = new ResponseReadShm();
responseData.setMsgType('J');
String str = "FREAD";
responseData.setIntValue(str.length());
responseData.setMsg(str);
responseData.setBytes(bytesRead);
ctx.writeAndFlush(responseData);
synchronized (responseLock) {
responseLock.wait();
}
}
ResponseReadShm responseData = new ResponseReadShm();
responseData.setMsgType('J');
String str = "FREAD_END";
responseData.setIntValue(str.length());
responseData.setMsg(str);
responseData.setBytes(-1);
ctx.writeAndFlush(responseData);
shm.cleanup();
fis.close();
} catch (Exception e) {
e.printStackTrace();
}
}
client.c
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
typedef struct {
char charValue;
int intValue;
char stringValue[256];
int filename_strlen;
char filename[128];
} RequestData;
typedef struct {
char msgType;
int intValue;
char msg[256];
int bytesRead;
} ResponseData;
void sendRequestData(int socket, const RequestData* data) {
// Convert the request data to a binary format
char buffer[512];
char charValue = data->charValue;
int intValue = htonl(data->intValue);
int strLen = strlen(data->stringValue);
int netStrLen = htonl(strLen);
memcpy(buffer, &charValue, sizeof(char)); // Copy the char value
memcpy(buffer + sizeof(char), &intValue, sizeof(int)); // int val
memcpy(buffer + sizeof(char) + sizeof(int), &netStrLen, sizeof(int)); // sizeof str
memcpy(buffer + sizeof(char) + 2 * sizeof(int), data->stringValue, strLen); // str
int strLen2 = strlen(data->filename);
int netStrLen2 = htonl(strLen2);
memcpy(buffer + sizeof(char) + 2 * sizeof(int) + strLen, &netStrLen2, sizeof(int)); // str
memcpy(buffer + sizeof(char) + 2 * sizeof(int) + strLen + sizeof(int), data->filename, strLen2); // str
// Send the binary data to the server
send(socket, buffer, sizeof(char) + 3 * sizeof(int) + strLen + strLen2, 0);
}
void readResponseData(int socket, ResponseData* data) {
// Read the response data from the server
recv(socket, &(data->msgType), sizeof(char), 0);
recv(socket, &(data->intValue), sizeof(int), 0);
data->intValue = ntohl(data->intValue);
int strLen;
/*recv(socket, &strLen, sizeof(int), 0);*/
strLen = data->intValue;
recv(socket, data->msg, strLen, 0);
data->msg[strLen] = '\0'; // Null-terminate the string
/*char buf[10];
recv(socket, buf, sizeof(char), 0);*/
recv(socket, &(data->bytesRead), sizeof(int), 0);
data->bytesRead = ntohl(data->bytesRead);
}
// Shared memory functions
int createSharedMemory(const char* name, int size) {
key_t key = ftok(name, 1);
int shmid = shmget(key, size, IPC_CREAT | 0666);
if (shmid == -1) {
perror("Failed to create shared memory");
return -1;
}
return shmid;
}
void* attachSharedMemory(const char* name, int bytesRead) {
key_t key = ftok(name, 1);
int shmid = shmget(key, bytesRead, 0);
if (shmid == -1) {
printf("%s", name);
perror("Failed to get shared memory ID");
return NULL;
}
void* shmaddr = shmat(shmid, NULL, 0);
if (shmaddr == (void*)-1) {
perror("Failed to attach shared memory");
return NULL;
}
return shmaddr;
}
int detachSharedMemory(void* shmaddr) {
if (shmdt(shmaddr) == -1) {
perror("Failed to detach shared memory");
return -1;
}
return 0;
}
int destroy(const char* name) {
// destroy the shared memory
key_t key = ftok(name, 1);
int shmid = shmget(key, 0, 0);
if (shmid == -1) {
printf("%s", name);
perror("Failed to destroy shared memory ID");
return -1;
}
shmctl(shmid, IPC_RMID, NULL);
return 0;
}
void readFromSharedMemory(int bytesRead) {
char *shmaddr = attachSharedMemory("shm", bytesRead);
if (shmaddr == NULL) {
exit(1);
}
FILE* file = fopen("shared_memory_dump.txt", "a+"); // Open the file for writing
if (file == NULL) {
perror("fopen");
exit(1);
}
// Read data from shared memory and write it to the file
/*printf("strlen %d ----byteread: %d\n", strlen(shmaddr), bytesRead);
if (strlen(shmaddr) < 1024 )
shmaddr[strlen(shmaddr)+1] = '\0';*/
fwrite(shmaddr, bytesRead, 1, file);
fclose(file); // Close the file
//destroy("shm");
}
void file_terminate(int bytesRead) {
FILE* file = fopen("shared_memory_dump.txt", "w"); // Open the file for writing
if (file == NULL) {
perror("fopen");
exit(1);
}
char null[1] = "\0";
// Read data from shared memory and write it to the file
fwrite(null, 1, 1, file);
}
int main() {
int socketfd;
struct sockaddr_in serverAddr;
char serverIP[] = "127.0.0.1";
int serverPort = 8080;
// Create socket
socketfd = socket(AF_INET, SOCK_STREAM, 0);
if (socketfd < 0) {
perror("Failed to create socket");
return -1;
}
// Set up server details
serverAddr.sin_family = AF_INET;
serverAddr.sin_port = htons(serverPort);
if (inet_pton(AF_INET, serverIP, &(serverAddr.sin_addr)) <= 0) {
perror("Failed to set up server address");
return -1;
}
// Connect to the server
if (connect(socketfd, (struct sockaddr*)&serverAddr, sizeof(serverAddr)) < 0) {
perror("Failed to connect to the server");
return -1;
}
// Prepare request data
RequestData requestData;
requestData.charValue = 'C';
requestData.intValue = 432;
strcpy(requestData.stringValue, "RUN");
printf("Enter filename: ");
scanf("%s", requestData.filename);
// Send request data to the server
sendRequestData(socketfd, &requestData);
int offset = 0;
while (1) {
ResponseData responseData2;
readResponseData(socketfd, &responseData2);
offset += responseData2.bytesRead;
if (!strcmp(responseData2.msg, "FREAD")) {
printf("Received response: {%c %s %d}\n", responseData2.msgType, responseData2.msg, responseData2.bytesRead);
readFromSharedMemory(responseData2.bytesRead);
RequestData requestData;
requestData.charValue = 'C';
requestData.intValue = 432;
strcpy(requestData.stringValue, "FREE");
sendRequestData(socketfd, &requestData);
} else if (!strcmp(responseData2.msg, "FREAD_END")) {
printf("Received response2: {%c %s %d}\n", responseData2.msgType, responseData2.msg, responseData2.bytesRead);
}
}
// Close the socket
close(socketfd);
return 0;
}
What is wrong in the wait? Are we doing something wrong by calling the wait?
Need some idea how to do it? When the wait is not there it's working but the write from the shared memory into a file in the client side is going wrong. So, I thought of it should be synchronized and while
loop for file should not proceed to write to SHM unless a response comes from the client side which will notify
.