2

Hey there StackOverflow people!

I'm making an IOCP server and I have ironed out most issues so far but one still remain and I do not know where to start looking at. When I run the client/server on my machine everything is fine and dandy. It matches the speed of the Windows SDK Sample maybe a little bit faster and definitely uses less CPU cycle. However when I run the client from a separate computer, transfer speed caps at 37 KB/s and has a roundtrip latency of 200ms (as opposed to 0). Now if I connect the client to the SDK Sample server, I don't have that problem so there is something wrong with my code. As far as I know, the sockets are initialized the exact same way with the same options. I have also ran my server in a profiler to check for bottleneck but I couldn't find any. Also, the computers I have tried it on were connected to the same gigabit switch (with gigabit adapter). I know this is kind of vague but that's because I couldn't pinpoint the problem so far and I would be eternally grateful if any of you guys could point me in the right direction.

Cheers,

-Roxy

EDIT2: After following Mike's advise, I did some research on the code and found out that when a remote client connects to the server most of the time the code is waiting on GetQueuedCompletionStatus. This suggest that IO request are simply taking a long time to complete but I still don't understand why. This only occurs only when the client is on a remote computer. I'm thinking this has something to do with how a setup the sockets or how I'm posting the request but I don't see any difference with the sample code.

Any ideas?

EDIT (Added sample code):

Alright, here it is! It ain't pretty though!

If you have the Windows SDK installed, you can connect to it using the iocpclient sample (Program Files\Microsoft SDKs\Windows\v7.1\Samples\netds\winsock\iocp\client) and changing it's default port at line 73 to 5000.

Weird things I've just noticed when trying it myself is that it seems the sample iocpclient doesn't cause the same caps at 37KB/s issue... However it looks like the sample code has a limit set to around 800KB/s. I'll post a client if that can be of any help.

#pragma comment(lib, "Ws2_32.lib")

#include <WinSock2.h>
#include <stdio.h>

unsigned int connection = 0;
unsigned int upload = 0;
unsigned int download = 0;

#define IO_CONTEXT_COUNT 5

class NetClientHost
{
friend class gNetProtocolHost;
public:
enum Operation
{
    kOperationUnknown,
    kOperationRead,
    kOperationWrite,
};

struct ClientData
{
    SOCKET           socket;
};

struct IOContext
{
    WSAOVERLAPPED    overlapped;
    WSABUF           wsaReceiveBuf;
    WSABUF           wsaSendBuf;
    char            *buf;
    char            *TESTbuf;
    unsigned long    bytesReceived;
    unsigned long    bytesSent;
    unsigned long    flags;
    unsigned int     bytesToSendTotal;
    unsigned int     remainingBytesToSend;
    unsigned int     chunk;
    Operation        operation;
};

NetClientHost()
{
    memset((void *) &m_clientData, 0, sizeof(m_clientData));
}

NetClientHost::IOContext *NetClientHost::AcquireContext()
{
    while (true)
    {
        for (int i = 0; i < IO_CONTEXT_COUNT; ++i)
        {
            if (!(m_ioContexts + i)->inUse)
            {
                InterlockedIncrement(&(m_ioContexts + i)->inUse);
                //ResetEvent(*(m_hContextEvents + i));

                if ((m_ioContexts + i)->ioContext.TESTbuf == 0)
                    Sleep(1);

                return &(m_ioContexts + i)->ioContext;
            }
        }
        //++g_blockOnPool;
        //WaitForMultipleObjects(IO_CONTEXT_COUNT, m_hContextEvents, FALSE, INFINITE);
    }   
}

const ClientData *NetClientHost::GetClientData() const
{
    return &m_clientData;
};

void NetClientHost::Init(unsigned int bufferSize)
{   
    _InitializeIOContexts(bufferSize ? bufferSize : 1024);
}

void NetClientHost::ReleaseContext(IOContext *ioContext)
{
    int i = sizeof(_IOContextData), j = sizeof(IOContext);
    _IOContextData *contextData = (_IOContextData *) (((char *) ioContext) - (i - j));
    InterlockedDecrement(&contextData->inUse);
    //SetEvent(*(m_hContextEvents + contextData->index));
}   

struct _IOContextData
{
    unsigned int index;
    volatile long inUse;        
    IOContext ioContext;
};

ClientData                    m_clientData;
_IOContextData               *m_ioContexts;
HANDLE                       *m_hContextEvents;

void _InitializeIOContexts(unsigned int bufferSize)
{
    m_ioContexts = new _IOContextData[IO_CONTEXT_COUNT];
    m_hContextEvents = new HANDLE[IO_CONTEXT_COUNT];

    memset((void *) m_ioContexts, 0, sizeof(_IOContextData) * IO_CONTEXT_COUNT);

    for (int i = 0; i < IO_CONTEXT_COUNT; ++i)
    {
        (m_ioContexts + i)->index = i;

        (m_ioContexts + i)->ioContext.buf = new char[bufferSize];
        (m_ioContexts + i)->ioContext.wsaReceiveBuf.len = bufferSize;
        (m_ioContexts + i)->ioContext.wsaReceiveBuf.buf = (m_ioContexts + i)->ioContext.buf;
        (m_ioContexts + i)->ioContext.TESTbuf = new char[10000];
        (m_ioContexts + i)->ioContext.wsaSendBuf.buf = (m_ioContexts + i)->ioContext.TESTbuf;

        *(m_hContextEvents + i) = CreateEvent(0, TRUE, FALSE, 0);
    }
}
void _SetSocket(SOCKET socket)
{
    m_clientData.socket = socket;
}
};



bool WriteChunk(const NetClientHost *clientHost, NetClientHost::IOContext *ioContext)
{
int status;

status = WSASend(clientHost->GetClientData()->socket, &ioContext->wsaSendBuf, 1, &ioContext->bytesSent, ioContext->flags, &ioContext->overlapped, 0);
if (status == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
    // ...
    return false;
}

return true;
}

bool Write(NetClientHost *clientHost, void *buffer, unsigned int size, unsigned int chunk)
{
//__ASSERT(m_clientHost);
//__ASSERT(m_clientHost->GetClientData()->remainingBytesToSend == 0);

NetClientHost::IOContext *ioContext = clientHost->AcquireContext();

if (!chunk)
    chunk = size;

ioContext->wsaSendBuf.buf = ioContext->TESTbuf;

ioContext->operation                = NetClientHost::kOperationWrite;
ioContext->flags                    = 0;
ioContext->wsaSendBuf.buf = new char[size];
memcpy((void *) ioContext->wsaSendBuf.buf, buffer, chunk);
ioContext->wsaSendBuf.len           = chunk;    
ioContext->chunk                    = chunk;
ioContext->bytesToSendTotal         = size;
ioContext->remainingBytesToSend     = size;

return WriteChunk(clientHost, ioContext);
}



void Read(NetClientHost *clientHost)
{   
NetClientHost::IOContext *ioContext = clientHost->AcquireContext();
int status;

memset((void *) ioContext, 0, sizeof(NetClientHost::IOContext));
ioContext->buf = new char[1024];
ioContext->wsaReceiveBuf.len = 1024;
ioContext->wsaReceiveBuf.buf = ioContext->buf;

ioContext->flags = 0;
ioContext->operation = NetClientHost::kOperationRead;

status = WSARecv(clientHost->GetClientData()->socket, &ioContext->wsaReceiveBuf, 1, &ioContext->bytesReceived, &ioContext->flags, &ioContext->overlapped, 0);
int i = WSAGetLastError();
if (status == SOCKET_ERROR && WSAGetLastError() != WSA_IO_PENDING)
{
    // ...
}   
}

bool AddSocket(HANDLE hIOCP, SOCKET socket)
{
++connection;

int bufSize = 0;
LINGER lingerStruct;
lingerStruct.l_onoff = 1;
lingerStruct.l_linger = 0;
setsockopt(socket, SOL_SOCKET, SO_SNDBUF, (char *) &bufSize, sizeof(int));
setsockopt(socket, SOL_SOCKET, SO_RCVBUF, (char *) &bufSize, sizeof(int));
setsockopt(socket, SOL_SOCKET, SO_LINGER, (char *) &lingerStruct, sizeof(lingerStruct) ); 

NetClientHost *clientHost = new NetClientHost;

clientHost->_InitializeIOContexts(1024);
clientHost->Init(0);
clientHost->_SetSocket(socket);

// Add this socket to the IO Completion Port
CreateIoCompletionPort((HANDLE) socket, hIOCP, (DWORD_PTR) clientHost, 0);

Read(clientHost);
return true;
}

int read = 0, write = 0;

DWORD WINAPI WorkerThread(LPVOID param)
{
LPOVERLAPPED overlapped;
NetClientHost *clientHost;
HANDLE hIOCP = (HANDLE) param;
DWORD ioSize;
BOOL status;

while (true)
{
    status = GetQueuedCompletionStatus(hIOCP, &ioSize, (PULONG_PTR) &clientHost, (LPOVERLAPPED *) &overlapped, INFINITE);

    if (!(status || ioSize))
    {
        --connection;
        //_CloseConnection(clientHost);
        continue;
    }

    NetClientHost::IOContext *ioContext = (NetClientHost::IOContext *) overlapped;

    switch (ioContext->operation)
    {
    case NetClientHost::kOperationRead:
        download += ioSize;
        Write(clientHost, ioContext->wsaReceiveBuf.buf, ioSize, 0);
        write++;
        clientHost->ReleaseContext(ioContext);
        break;

    case NetClientHost::kOperationWrite:
        upload += ioSize;
        if (ioContext->remainingBytesToSend)
        {
            ioContext->remainingBytesToSend -= ioSize;
            ioContext->wsaSendBuf.len = ioContext->chunk <= ioContext->remainingBytesToSend ? ioContext->chunk : ioContext->remainingBytesToSend; // equivalent to min(clientData->chunk, clientData->remainingBytesToSend);
            ioContext->wsaSendBuf.buf += ioContext->wsaSendBuf.len;
        }

        if (ioContext->remainingBytesToSend)
        {       
            WriteChunk(clientHost, ioContext);
        }
        else
        {
            clientHost->ReleaseContext(ioContext);              
            Read(clientHost);
            read++;
        }
        break;
    }
}

return 0;
}

DWORD WINAPI ListenThread(LPVOID param)
{
SOCKET sdListen = (SOCKET) param;

HANDLE hIOCP = CreateIoCompletionPort(INVALID_HANDLE_VALUE, 0, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);
CreateThread(0, 0, WorkerThread, hIOCP, 0, 0);

while (true)
{
    SOCKET as = WSAAccept(sdListen, 0, 0, 0, 0);
    if (as != INVALID_SOCKET)
        AddSocket(hIOCP, as);
}
}

int main()
{
SOCKET      sdListen;
SOCKADDR_IN si_addrlocal;   
int         nRet;   
int         nZero = 0;   
LINGER      lingerStruct;   

WSADATA wsaData;
WSAStartup(0x202, &wsaData);

sdListen = WSASocket(AF_INET, SOCK_STREAM, IPPROTO_IP, NULL, 0, WSA_FLAG_OVERLAPPED);    
si_addrlocal.sin_family = AF_INET;   
si_addrlocal.sin_port = htons(5000);   
si_addrlocal.sin_addr.s_addr = htonl(INADDR_ANY);          
nRet = bind(sdListen, (struct sockaddr *)&si_addrlocal, sizeof(si_addrlocal));   
nRet = listen(sdListen, 5);

nZero = 0;   
nRet = setsockopt(sdListen, SOL_SOCKET, SO_SNDBUF, (char *) &nZero, sizeof(nZero));   
nZero = 0;   
nRet = setsockopt(sdListen, SOL_SOCKET, SO_RCVBUF, (char *)&nZero, sizeof(nZero));
lingerStruct.l_onoff = 1;   
lingerStruct.l_linger = 0; 
nRet = setsockopt(sdListen, SOL_SOCKET, SO_LINGER, (char *)&lingerStruct, sizeof(lingerStruct) );

CreateThread(0, 0, ListenThread, (LPVOID) sdListen, 0, 0);

HANDLE console = GetStdHandle(STD_OUTPUT_HANDLE);
while (true)
{
    COORD c = {0};
    SetConsoleCursorPosition(console, c);
    printf("Connections: %i                      \nUpload: %iKB/s               \nDownload: %iKB/s              ", connection, upload * 2 / 1024, download * 2 / 1024);
    upload = 0;
    download = 0;
    Sleep(500);
}



return 0;
}
Pixy
  • 1,048
  • 4
  • 18
  • 27
  • Belongs on http://serverfault.com – Paul R Feb 15 '11 at 21:33
  • @Paul Y 2 lazy 2 vote 2 close? – bzlm Feb 15 '11 at 21:36
  • 1
    @Paul, @bzlm: This is clearly a programming question. It does not belong on SF. – Marcelo Cantos Feb 15 '11 at 21:41
  • @Roxanne: You need to show some code. Try to reduce it to the bare minimum of code that exhibits the problem, and add it to your question. – Marcelo Cantos Feb 15 '11 at 21:42
  • IOCP are designed to serve a lot of simultaneous connections, not necessarily maximum bandwidth per connection, but even with just a single connection you should be getting a lot better throughput than that. – Ben Voigt Feb 15 '11 at 22:23
  • @Marcelo: sorry - it looks more like a networking question to me - I don't see any *code*... – Paul R Feb 15 '11 at 22:36
  • @Marcelo: Code added, thanks for your constructive reply :) – Pixy Feb 15 '11 at 23:57
  • setting SO_SNDBUF SO_RCVBUF to 0 seems odd, though that's perhaps a winsock thing – nos Feb 16 '11 at 00:00
  • @Paul: Even before adding code to the question, the OP was talking about "making an IOCP server", "SDK samples", running things "in a profiler". – Marcelo Cantos Feb 16 '11 at 03:19
  • @nos: For the bit of code I've posted I copied the socket initialization from the iocp SDK sample. Here's the rationalization: "Disable receive buffering on the socket. Setting SO_RCVBUF to 0 causes winsock to stop bufferring receive and perform receives directly from our buffers, thereby reducing CPU usage.". Don't know if that's true, I plan on experimenting to see what works best. – Pixy Feb 16 '11 at 18:01

1 Answers1

1

This kind of asynchronous system should be able to run at full datalink speed. Problems I've found wrong are such as:

  • timeout settings causing needless retransmissions
  • in the receiving process, received message A might trigger a database update, such that received message B has to wait, causing an unnecessary delay in the response to message B back to the sender, when the DB update could actually be done in idle time.

There's something called wireshark that can give you some visibility into the message traffic. I used to do it the hard way, with time-stamped message logs.

BTW: I would first use this method on the individual processes, to clean out any bottlenecks, before doing the asynchronous analysis. If you haven't done this, you can bet they're in there. Just any old profiler isn't reliable. There are good ones, including Zoom.

Community
  • 1
  • 1
Mike Dunlavey
  • 40,059
  • 14
  • 91
  • 135
  • Hi Mike, right now it does nothing but post back whatever was sent to it just to test my code before moving to something else so that rules out any delay caused by DB or anything of the like. Also, the code runs fine (around 20Mb/s) when I run it on my local computer which is why I think the problem is with the network code more than anything else. I'll definitely look into the link you posted and reply with my results. Thanks :) – Pixy Feb 16 '11 at 00:03
  • @ Mike: I've read and followed the instruction you posted in the other thread and most of the time the code is waiting on GetQueuedCompletionStatus. This suggest that IO request are simply taking a long time to complete but I still don't understand why. This only occurs only when the client is on a remote computer. I'm thinking this has something to do with how a setup the sockets or how I'm posting the request but I don't see any difference with the sample code. – Pixy Feb 17 '11 at 17:16
  • @Roxanne: That's good. It should be spending most of its time waiting on IO. The next question is, for what purpose? I haven't studied the code, but each time you pause it, if you look up the stack you should be able to tell if it's waiting for a "send" to complete, or a "receive", and if so, what kind of message (request, data, acknowledgement, etc). That should give you a clue if unnecessary messages are being exchanged. – Mike Dunlavey Feb 17 '11 at 21:28
  • @Roxanne: I've seen cases where both sides spent a lot of time saying, in effect, "Excuse me, what did you say? Can you hear me? Are you still there?" or "Hang on while I hit the DNS server to look up your IP address again." – Mike Dunlavey Feb 17 '11 at 21:51
  • @Mike: It seems it's always waiting on receiving. In the code I provided in my question there is no message, the server simply send whatever bytes you send to it. While playing with it I discovered an interesting fact: in the client I had set the MSG_WAITALL on recv to make sure it was receiving all 1080 bytes of data before returning. If I remove that flag, the transfer would start at a much faster speed but would eventually slow down to a crawl. I'll continue to investigate but right now I'm totally clueless as to why this is happening. Thanks for all you help so far btw. – Pixy Feb 18 '11 at 20:00
  • @Roxanne: Not to belabor, but as I study the code, I see ListenThread being created, which creates 4 WorkerThreads and then goes into a loop accepting sockets. I assume that doesn't do much. When you pause it, you see worker threads in GetQueuedCompletionStatus, and when you let it step out, it has kOperationRead as its status. Then you Write some stuff, release a context, and loop. Is it possible you're creating stuff and not releasing it? Also, I wonder why 4 worker threads when you've got just one socket. Of course, I'm sure I don't really understand it. – Mike Dunlavey Feb 18 '11 at 22:32
  • @Mike: Thanks for looking into my code! For every client WSAAccept will return a new socket which will be added to the IOCP and an initial read is posted. The Read and Write method will Acquire a Context and will associate it with the request. This Context will be release when the operation complete. (AcquireContext returns a context from it's pool, if none is available it will block until on is released). The thread pool (4 in this example but this is cpu_count * 2 in my actual code) serves the read/write requests that have completed. – Pixy Feb 21 '11 at 14:55
  • @Mike: [cont] There is as many socket as there is clients connected to the server. It is possible however that the example code leaks stuff, but the actual code doesn't leak on shutdown however I've not checked if it does during the execution yet. Hope this helps understand the code, let me know if I left anything unclear. Thanks again for the help. – Pixy Feb 21 '11 at 14:55