2

We have an application on Centos 6 that calls recvmmsg() on a multicast address to read 1024 UDP packets at a time. When we run multiple instances of this application on the same box (all listening to the same traffic), sometimes this call will block for multiple seconds, despite the socket being non-blocking, and passing in MSG_DONTWAIT. It works fine in all other circumstances, but will freeze under high load (50MB/s). When the application blocks, we fall behind on the UDP traffic and can't recover. The process is running using the RR scheduler as high priority to avoid interference from other processes. We've tried switching to recvfrom() and recv() in a for loop as well with the same results.

The only thing that we can see in the kernel source that could block this is spin_lock_irqsave() on the queue lock in __skb_try_recv_datagram(). But I don't know under what circumstances that would be a problem, or what to do about it to prevent blocking, or if that's really the issue.

I'm not sure where to look next, so any pointers would be appreciated.

Created a very simple program that can replicate this on one of the servers where we see this (didn't paste the interface retrieval function, but that shouldn't be relevant here, let me know if you need it).

recv() example:

int main(){
    int fd = socket(AF_INET,SOCK_DGRAM,0);
    int flags = fcntl(fd,F_GETFL,0);
    fcntl(fd,F_SETFL, flags | O_NONBLOCK);
    int reuse = 1;
    setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
    struct sockaddr_in sockaddr;
    sockaddr.sin_port = htons(4755);
    sockaddr.sin_family = AF_INET;
    sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){ 
      printf("Failed to bind.\n");
      return 1;
    }

    in_addr_t interface;
    if(!getInterface("192.168.15.255",&interface)){
      printf("Failed to get interface.\n");
      return 1;
    }
    struct ip_mreq imr;
    memset(&imr,0,sizeof(imr));
    imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
    imr.imr_interface.s_addr = interface;
    if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
      printf("Group not in multicast.");
      return 1;
    }
    if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr))    < 0){ 
      printf("Failed to add membership, errno: %d.\n",errno);
      return 1;
    }

    int epollInstance = epoll_create1(0);
    struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
    epollEvents[0].events = EPOLLIN;
    epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);

    const int PACKETS_TO_READ = 1024;
    static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
    static struct iovec iovecs[PACKETS_TO_READ];
    static struct mmsghdr msgs[PACKETS_TO_READ];
    static struct sockaddr_in sockFrom[PACKETS_TO_READ];
    for (int i = 0; i < PACKETS_TO_READ; i++) {
      iovecs[i].iov_base         = receiveBuffer[i];
      iovecs[i].iov_len          = USHRT_MAX;
      msgs[i].msg_hdr.msg_iov    = &iovecs[i];
      msgs[i].msg_hdr.msg_iovlen = 1;
      msgs[i].msg_hdr.msg_name = &sockFrom[i];
      msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
    }

    struct timeval start;
    struct timeval end;  

    while(1){
      int selected = epoll_wait(epollInstance,epollEvents,8192,10);
      if(selected > 0){ 
        gettimeofday(&start,NULL);
        int numPackets = 0;
        for(int i = 0; i < PACKETS_TO_READ; i++){
        int result = recv(fd,receiveBuffer[0],USHRT_MAX,MSG_DONTWAIT);
        if(result == EAGAIN) break;
          numPackets++;
        }
        gettimeofday(&end,NULL);
        printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
      }   
    }
}

recvmmsg() example:

int main(){
    int fd = socket(AF_INET,SOCK_DGRAM,0);
    int flags = fcntl(fd,F_GETFL,0);
    fcntl(fd,F_SETFL, flags | O_NONBLOCK);
    int reuse = 1;
    setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
    struct sockaddr_in sockaddr;
    sockaddr.sin_port = htons(4755);
    sockaddr.sin_family = AF_INET;
    sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
    if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){ 
      printf("Failed to bind.\n");
      return 1;
    }

    in_addr_t interface;
    if(!getInterface("192.168.15.255",&interface)){
      printf("Failed to get interface.\n");
      return 1;
    }
    struct ip_mreq imr;
    memset(&imr,0,sizeof(imr));
    imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
    imr.imr_interface.s_addr = interface;
    if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
      printf("Group not in multicast.");
      return 1;
    }
    if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr))    < 0){ 
      printf("Failed to add membership, errno: %d.\n",errno);
      return 1;
    }

    int epollInstance = epoll_create1(0);
    struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
    epollEvents[0].events = EPOLLIN;
    epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);

    const int PACKETS_TO_READ = 1024;
    static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
    static struct iovec iovecs[PACKETS_TO_READ];
    static struct mmsghdr msgs[PACKETS_TO_READ];
    static struct sockaddr_in sockFrom[PACKETS_TO_READ];
    for (int i = 0; i < PACKETS_TO_READ; i++) {
      iovecs[i].iov_base         = receiveBuffer[i];
      iovecs[i].iov_len          = USHRT_MAX;
      msgs[i].msg_hdr.msg_iov    = &iovecs[i];
      msgs[i].msg_hdr.msg_iovlen = 1;
      msgs[i].msg_hdr.msg_name = &sockFrom[i];
      msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
    }

    struct timeval start;
    struct timeval end;  

    while(1){
      int selected = epoll_wait(epollInstance,epollEvents,8192,10);
      if(selected > 0){ 
        gettimeofday(&start,NULL);
        int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0);
        gettimeofday(&end,NULL);
        printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
      }   
    }
}
Tars AC
  • 79
  • 2
  • 2
    To be clear: you have observed the blocking behavior you describe with these programs only when you run multiple instances at the same time? Bonus question: why would you want to do that? – John Bollinger Jun 26 '17 at 20:38
  • Hmm. It may not be a problem if the program works at all, but its Linux docs say that the argument for the `IP_ADD_MEMBERSHIP` socket option should be a `struct ip_mreqn`, but you are providing a `struct ip_mreq`. The former has an additional member at the end. – John Bollinger Jun 26 '17 at 21:43
  • @JohnBollinger thanks for your response. Yes, we observed blocking behavior only when we run multiple instances. We want to do that because the application is essentially single threaded and this allows us to use multiple cores to do business logic. – Tars AC Jun 27 '17 at 15:52
  • @JohnBollinger For the ip_mreqn vs ip_mreq, my understanding based on the manpage is that this is backwards compatible behavior - struct ip_mreq is the old structure, and ip_mreqn added a member or two to support additional functionality. I assume it figures out which one you are using based on the size passed in, although I haven't checked the kernel source to verify that. The IP_ADD_MEMBERSHIP code is very old (I want to say 15 years old, but might be off) and originally ran on AIX, and then very old Linux kernels. – Tars AC Jun 27 '17 at 15:56
  • I don't have a confident explanation of the nature of the problem, but I do find it suspicious that you may have multiple separate processes concurrently trying to read from distinct open file descriptions connected to the same local address (`sockaddr` sense). The idea makes my skin crawl, actually. I suggest restructuring so that all the processes (or threads) are `recv()`ing from *the same* open file description. You don't even need extra synchronization for that, because [`recv()` is atomic](https://stackoverflow.com/q/1981372/2402272). – John Bollinger Jun 27 '17 at 17:00
  • @JohnBollinger wouldn't that mean that they would not all receive the same messages? Each of the processes would need to get all of the same data, or else the business logic would not work. – Tars AC Jun 27 '17 at 17:44
  • You mean they actually *do* receive the same messages *now*? Now I'm practically squirming. But in that case I even more recommend restructuring the program. As it is now, the kernel <--> userspace bandwidth you require is a multiple of the number of processes involved, but only the bandwidth required by one process is actually necessary if the processes share. – John Bollinger Jun 27 '17 at 17:58

0 Answers0