3

This is how my server looks like:

-WorkerThread(s):

  • calls epoll_wait, accepts connections, sets fd nonblocking(EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP)
  • calls recv until EAGAIN on EPOLLIN event and pushes all data to global RecvBuffer(pthread_mutex synced)
  • on EPOLLOUT event: accesses global SendBuffer and if there's data to be sent for current ready fd, do it (in while loop until EAGAIN or until all data is sent; when whole packet is sent, pop it from SendBuffer)

-IOThread(s)

  • takes data from global RecvBuffer, proccess them
  • sends response by first trying to call send right away. If not all data is sent, push rest of it onto global SendBuffer to be sent from WorkerThread)

Problem is, that server doesnt send all queued data(they are left in SendBuffer) and amount of 'not sent' data grows by increasing number of clients. For the sake of testing im using only 1 workerthread and 1 iothread, but it doesnt seem to make any difference if i use more. Accessing global buffers is protected with pthread_mutex. Also, my response data size is 130k bytes(it needs 3 send calls at least to send this amount of data). On the other side is windows client using blocking sockets.

Thank you very much! MJ

edit:

Yes, by default I'm waiting for EPOLLOUT events even tho I have nothing to send. For implementation simplicity and man page guide, i did it like this. Also, my undestanding of it was like this:

Even if I "miss" EPOLLOUT event at the time i dont want to send anything it's no problem because when i want to send data, I'll call send until EAGAIN and EPOLLOUT should be triggered in future(and it is most of time)

Now I modified code to switch between IN/OUT events:

On accept:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_ADD, infd, &event);

when all data has been sent:

event.events = EPOLLIN | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

when I reach EAGAIN by calling send in IOThread:

event.events = EPOLLOUT | EPOLLET | EPOLLRDHUP;
epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, events[i].data.fd, &event);

..and I get same behavior. Also, I tried removing EPOLLET flag and nothing's changed

One side question: Does epoll_ctl with EPOLL_CTL_MOD flag replaces events member or just ORs it with given argument?

EDIT3: Updated IOThread function to send continiuosly until all data has been sent, or until EAGAIN. I also tried to send even if I sent all data, but most of time i was getting errno 88 Socket operation on non-socket

EDIT4: I fixed some bugs in my 'sending code' so I dont get any queued data not sent now.. But, I dont receive as much data as I should :)) Highest amount of 'missed'(not received) data I get when client calls recv right away when sending is complete, and it grows with number of clients. When I put 2 sec delay between send and recv call on client(blocking calls) I lose none to little data on server, depending how many clients im running( client test code includes simple for loop with 1 send and 1 recv call after it ) Again, tried with and without ET mode.. Below is updated WorkerThread function which is responsible for receiving data. @Admins/Mods Maybe I should open new topic now as problem is a bit different?

void CNetServer::WorkerThread(void* param)
{
    CNetServer* pNetServer =(CNetServer*)param;
    struct epoll_event event;
    struct epoll_event *events;
    int s = 0;

//  events = (epoll_event*)calloc (MAXEVENTS, sizeof event);

    while (1)
    {
        int n, i;

//      printf ("BLOCKING NOW! epoll_wait thread %d\n",pthread_self());
        n = pNetServer->m_epollCtrl.Wait(-1);
//      printf ("epoll_wait thread %d\n",pthread_self());
        pthread_mutex_lock(&g_mtx_WorkerThd);
        for (i = 0; i < n; i++)
        {
            if ((pNetServer->m_epollCtrl.Event(i)->events & EPOLLERR))
            {
                // An error has occured on this fd, or the socket is not ready for reading (why were we notified then?)

            //  g_SendBufferArray.RemoveAll( 0 );

                char szFileName[30] = {0};
                sprintf( (char*)szFileName,"fd_%d.txt",pNetServer->m_epollCtrl.Event(i)->data.fd );
                remove(szFileName);

            /*  printf( "\n\n\n");
                printf( "\tDATA LEFT COUNT:%d\n",g_SendBufferArray.size());
                for (int k=0;k<g_SendBufferArray.size();k++)
                    printf( "\tSD: %d DATA LEFT:%d\n",g_SendBufferArray[i]->sd,g_SendBufferArray[i]->nBytesSent );
*/

            //  fprintf (stderr, "epoll error\n");
            //  fflush(stdout);
                close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                continue;
            }
            else if (pNetServer->m_ListenSocket == pNetServer->m_epollCtrl.Event(i)->data.fd)
            {
                // We have a notification on the listening socket, which   means one or more incoming connections. 
                while (1)
                {
                    struct sockaddr in_addr;
                    socklen_t in_len;
                    int infd;
                    char hbuf[NI_MAXHOST], sbuf[NI_MAXSERV];

                    in_len = sizeof in_addr;
                    infd = accept (pNetServer->m_ListenSocket, &in_addr, &in_len);
                    if (infd == -1)
                    {
                        if ((errno == EAGAIN) ||
                            (errno == EWOULDBLOCK))
                        {
                            // We have processed all incoming connections.
                            break;
                        }
                        else
                        {
                            perror ("accept");
                            break;
                        }
                    }

                    s = getnameinfo (&in_addr, in_len,
                        hbuf, sizeof hbuf,
                        sbuf, sizeof sbuf,
                        NI_NUMERICHOST | NI_NUMERICSERV);
                    if (s == 0)
                    {
                        printf("Accepted connection on descriptor %d "
                            "(host=%s, port=%s) thread %d\n", infd, hbuf, sbuf,pthread_self());
                    }

                    // Make the incoming socket non-blocking and add it to the list of fds to monitor.
                    CEpollCtrl::SetNonBlock(infd,true);
                    if ( !pNetServer->m_epollCtrl.Add( infd, EPOLLIN, NULL ))
                    {
                        perror ("epoll_ctl");
                        abort ();
                    }

                }
                continue;
            }
            if( (pNetServer->m_epollCtrl.Event(i)->events & EPOLLOUT) )
            {

                pNetServer->DoSend( pNetServer->m_epollCtrl.Event(i)->data.fd );
            }
            if( pNetServer->m_epollCtrl.Event(i)->events & EPOLLIN )
            {
                printf("EPOLLIN TRIGGERED FOR SD: %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                // We have data on the fd waiting to be read. 
                int done = 0;
                ssize_t count = 0;
                char buf[512];
                while (1)
                {
                    count = read (pNetServer->m_epollCtrl.Event(i)->data.fd, buf, sizeof buf);
                    printf("recv sd %d size %d thread %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd,count,pthread_self());
                    if (count == -1)
                    {
                        // If errno == EAGAIN, that means we have read all data. So go back to the main loop.
                        if ( errno != EAGAIN )
                        {
                            perror ("read");
                            done = 1;
                        }
                        break;
                    }
                    else if (count == 0)
                    {
                        //connection is closed by peer.. do a cleanup and close
                        done = 1;
                        break;
                    }
                    else if (count > 0)
                    {
                        static int nDataCounter = 0;
                        nDataCounter+=count;
                        printf("RECVDDDDD %d\n",nDataCounter);
                        CNetServer::s_pRecvContainer->OnData( pNetServer->m_epollCtrl.Event(i)->data.fd, buf, count );
                    }
                }

                if (done)
                {
                    printf ("Closed connection on descriptor %d\n",pNetServer->m_epollCtrl.Event(i)->data.fd);
                    // Closing the descriptor will make epoll remove it from the set of descriptors which are monitored. 
                    close (pNetServer->m_epollCtrl.Event(i)->data.fd);
                }
            }

        }
//      

        pNetServer->IOThread( (void*)pNetServer );

        pthread_mutex_unlock(&g_mtx_WorkerThd);
    }

}

void CNetServer::IOThread(void* param)
{

    BYTEARRAY* pbPacket = new BYTEARRAY;
    int fd;
    struct epoll_event event;
    CNetServer* pNetServer =(CNetServer*)param;

    printf("IOThread startin' !\n");

    for (;;)
    {
        bool bGotIt = CNetServer::s_pRecvContainer->GetPacket( pbPacket, &fd );

        if( bGotIt )
        {

            //process packet here
            printf("Got 'em packet yo !\n");
            BYTE* re = new BYTE[128000];
            memset((void*)re,0xCC,128000);
            buffer_t* responsebuff = new buffer_t( fd, re, 128000 ) ;

            pthread_mutex_lock(&g_mtx_WorkerThd);

            while( 1 )
            {
                    int s;
                    int nSent = send( responsebuff->sd, ( responsebuff->pbBuffer + responsebuff->nBytesSent ),responsebuff->nSize - responsebuff->nBytesSent,0 );
                    printf ("IOT: Trying to send nSent: %d buffsize: %d \n",nSent,responsebuff->nSize - responsebuff->nBytesSent);

                    if (nSent == -1)
                    {

                        if (errno == EAGAIN || errno == EWOULDBLOCK )
                        {
                                g_vSendBufferArray.push_back( *responsebuff );
                                printf ("IOT: now waiting for EPOLLOUT\n");
                                event.data.fd = fd;
                                event.events = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP;
                                s = epoll_ctl (pNetServer->m_EventFD, EPOLL_CTL_MOD, fd, &event);
                                break;
                                if (s == -1)
                                {
                                    perror ("epoll_ctl");
                                    abort ();
                                }

                        }
                        else
                        {
                            printf( "%d\n",errno );
                            perror ("send");
                            break;
                        }
                        printf ("IOT: WOOOOT\n");
                        break;
                    }
                    else if (nSent == responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT:all is sent! wOOhOO\n");
                        responsebuff->sd = 0;
                        responsebuff->nBytesSent += nSent;
                        delete responsebuff;
                        break;
                    }
                    else if (nSent < responsebuff->nSize - responsebuff->nBytesSent)
                    {
                        printf ("IOT: partial send!\n");
                        responsebuff->nBytesSent += nSent;

                    }

            }
            delete [] re;

            pthread_mutex_unlock(&g_mtx_WorkerThd);

        }
    }

}
Mike Jackson
  • 183
  • 1
  • 12
  • 1
    Just to be sure: you're not waiting for EPOLLOUT (i.e. you give it to epoll_ctl()) when you have nothing to send, right? Because if you are, you may get a notification that you can, but you have nothing to send so you forget about it. But when you decide you have something to send, the kernel isn't going to tell you again that you can send. This is so because you enabled EPOLLET. – Ambroz Bizjak Aug 02 '12 at 23:35
  • 1
    You should only request events (e.g. EPOLLOUT) when you're actually interested in the event. Otherwise you may either lose notification (EPOLLET) or cycle in an infinite loop (no EPOLLET). – Ambroz Bizjak Aug 02 '12 at 23:37
  • Note that if you do as I suggested (only enable events when you're going to react to them), and you consume events completely (send/read until eagain, or disable the events if you're no longer interested), it wouldn't matter whether you use EPOLLET or not. EDIT: just disable EPOLLET, to be sure. – Ambroz Bizjak Aug 02 '12 at 23:41
  • @Ambroz Thank you very much for your answers. I edited original post. – Mike Jackson Aug 03 '12 at 10:41
  • 1. are you sure you wish to stop receiving when you're sending (you implicitly remove the EPOLLIN flag when you add EPOLLOUT)? 2. yes, EPOLL_CTL_MOD replaces the events. 3. have you considered that you're not getting EPOLLOUT because it's not possible to send? – Ambroz Bizjak Aug 03 '12 at 10:57
  • You may wish to post a complete test case here, not just describe what you're doing. – Ambroz Bizjak Aug 03 '12 at 10:58
  • How are you getting the "not possible to send at this time" condition in the first place? If you're pulling the cable out temporarily, it may take a long time (even minutes) to recover. But if you're just sending out more data than the link can handle, note that the kernel will buffer *a lot* of it, and at some point will decide it no longer needs your data. It will only accept more when it has sent most of it. – Ambroz Bizjak Aug 03 '12 at 11:00
  • In other words, you may just be thinking it's stalled, but the kernel is actively sending stuff from its buffer. Use netstat to check for SendQ size. – Ambroz Bizjak Aug 03 '12 at 11:02
  • 1. Oh, yea, i surly dont want stop receving. Fixed it. – Mike Jackson Aug 03 '12 at 11:07
  • 3. Hmmm how can it not be possible to send for infinite amount of time where clients are blocked on recv waiting for data? This is my client test code and I will post server code in 10mins: – Mike Jackson Aug 03 '12 at 11:08
  • for (int i=0;i<2000;i++) { send( s, aaa,6,NULL); printf("%d\n",recv( s,(char*)re,128000,MSG_WAITALL) ); } – Mike Jackson Aug 03 '12 at 11:09
  • It can not be possible to send even if clients are always doing recv() if the connection is too slow, or the clients themselves can't process the data fast enough. You should always expect that. How to handle it depends on what you're doing. E.g. if it's a bulk data transfer, you probably want to stop reading the data you're sending so you don't blow up all memory. If it's a slow control connection, it probably means the connection is broken and you can close it. – Ambroz Bizjak Aug 03 '12 at 11:16
  • I'm not sure if that's it, but you seem to have a deadlock between the client and the server: consider what happens if the client is blocked in send(), and the server is too waiting to send stuff (and not receiving at the time, unless you fixed that)! – Ambroz Bizjak Aug 03 '12 at 11:20
  • Well, my test environment is VMWare, server running on debian, all running on SSD drive so i guess connection should be ok. Anyway, my server will handle file transfers most of the time with around 1000 client at once – Mike Jackson Aug 03 '12 at 11:24
  • One thing seems strange: if you get a "partial send", you stop trying to send() and start waiting for EPOLLOUT. Try sending more, and start waiting for EPOLLOUT only when you really get EAGAIN. – Ambroz Bizjak Aug 03 '12 at 11:27
  • You should probably have a single piece of code for sending, used in both threads. – Ambroz Bizjak Aug 03 '12 at 11:28
  • Yea, i fixed that part where i switched only to EPOLLOUT without putting EPOLLIN too. Clients definitely block on recv. edit: yea, ill encapsulate SendBuffer and epoll too.. should have OnRecv and OnSend for cleaner code :) – Mike Jackson Aug 03 '12 at 11:31
  • updated first post with new IOThread as Ambroz you suggested – Mike Jackson Aug 03 '12 at 12:40
  • forgot to mention that i still have problem with edited code – Mike Jackson Aug 03 '12 at 13:04
  • What does `CEpollCtrl::SetNonBlock` actually do? If you block on receive, then you probably haven't actually get the socket non-blocking. – David Schwartz Jul 12 '13 at 23:23

1 Answers1

5
  1. Stop using EPOLLET. It's almost impossible to get right.

  2. Don't ask for EPOLLOUT events if you have nothing to send.

  3. When you have data to send on a connection, follow this logic:

    A) If there's already data in your send queue for that connection, just add the new data. You're done.

    B) Try to send the data immediately. If you send it all, you're done.

    C) Save the leftover data in the send queue for this connection. Now ask for EPOLLOUT for this connection.

David Schwartz
  • 179,497
  • 17
  • 214
  • 278
  • 1. I get same behavior with or without EPOLLET 2. Stoped doing that.(posted updated code) 3. I kinda do that already acually – Mike Jackson Aug 03 '12 at 13:30
  • @MikeJackson: So what issue(s) are you left with then? With this architecture, it's impossible to "miss" an EPOLLET, since you'll keep getting it as you keep asking for it. – David Schwartz Aug 03 '12 at 13:34
  • On about 50 test clients, which just send packet and wait for response 10 to 20 times, half of them gets stuck on recv(dont receive all data from server) and on server I can clearly see my sendbuffer has those leftovers which are never picked up and sent later on – Mike Jackson Aug 03 '12 at 13:37
  • Then it sounds like you both aren't doing 3B and either aren't doing 3C or are de-selecting EPOLLIN when you select EPOLLOUT. – David Schwartz Aug 03 '12 at 13:40
  • Uh.. i dont think so.. I think I'm doing everything like you described. Gonna start rewriting code now, so maybe ill spot some error – Mike Jackson Aug 03 '12 at 14:02