We have an application on Centos 6 that calls recvmmsg()
on a multicast address to read 1024 UDP packets at a time. When we run multiple instances of this application on the same box (all listening to the same traffic), sometimes this call will block for multiple seconds, despite the socket being non-blocking, and passing in MSG_DONTWAIT
. It works fine in all other circumstances, but will freeze under high load (50MB/s). When the application blocks, we fall behind on the UDP traffic and can't recover. The process is running using the RR scheduler as high priority to avoid interference from other processes. We've tried switching to recvfrom()
and recv()
in a for loop as well with the same results.
The only thing that we can see in the kernel source that could block this is spin_lock_irqsave()
on the queue lock in __skb_try_recv_datagram()
. But I don't know under what circumstances that would be a problem, or what to do about it to prevent blocking, or if that's really the issue.
I'm not sure where to look next, so any pointers would be appreciated.
Created a very simple program that can replicate this on one of the servers where we see this (didn't paste the interface retrieval function, but that shouldn't be relevant here, let me know if you need it).
recv() example:
int main(){
int fd = socket(AF_INET,SOCK_DGRAM,0);
int flags = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL, flags | O_NONBLOCK);
int reuse = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
struct sockaddr_in sockaddr;
sockaddr.sin_port = htons(4755);
sockaddr.sin_family = AF_INET;
sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){
printf("Failed to bind.\n");
return 1;
}
in_addr_t interface;
if(!getInterface("192.168.15.255",&interface)){
printf("Failed to get interface.\n");
return 1;
}
struct ip_mreq imr;
memset(&imr,0,sizeof(imr));
imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
imr.imr_interface.s_addr = interface;
if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
printf("Group not in multicast.");
return 1;
}
if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr)) < 0){
printf("Failed to add membership, errno: %d.\n",errno);
return 1;
}
int epollInstance = epoll_create1(0);
struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
epollEvents[0].events = EPOLLIN;
epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);
const int PACKETS_TO_READ = 1024;
static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
static struct iovec iovecs[PACKETS_TO_READ];
static struct mmsghdr msgs[PACKETS_TO_READ];
static struct sockaddr_in sockFrom[PACKETS_TO_READ];
for (int i = 0; i < PACKETS_TO_READ; i++) {
iovecs[i].iov_base = receiveBuffer[i];
iovecs[i].iov_len = USHRT_MAX;
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = &sockFrom[i];
msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
}
struct timeval start;
struct timeval end;
while(1){
int selected = epoll_wait(epollInstance,epollEvents,8192,10);
if(selected > 0){
gettimeofday(&start,NULL);
int numPackets = 0;
for(int i = 0; i < PACKETS_TO_READ; i++){
int result = recv(fd,receiveBuffer[0],USHRT_MAX,MSG_DONTWAIT);
if(result == EAGAIN) break;
numPackets++;
}
gettimeofday(&end,NULL);
printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
}
}
}
recvmmsg() example:
int main(){
int fd = socket(AF_INET,SOCK_DGRAM,0);
int flags = fcntl(fd,F_GETFL,0);
fcntl(fd,F_SETFL, flags | O_NONBLOCK);
int reuse = 1;
setsockopt(fd,SOL_SOCKET,SO_REUSEADDR,(char*)&reuse,sizeof(reuse));
struct sockaddr_in sockaddr;
sockaddr.sin_port = htons(4755);
sockaddr.sin_family = AF_INET;
sockaddr.sin_addr.s_addr = htonl(INADDR_ANY);
if(bind(fd,(struct sockaddr*)&sockaddr,sizeof(sockaddr)) < 0){
printf("Failed to bind.\n");
return 1;
}
in_addr_t interface;
if(!getInterface("192.168.15.255",&interface)){
printf("Failed to get interface.\n");
return 1;
}
struct ip_mreq imr;
memset(&imr,0,sizeof(imr));
imr.imr_multiaddr.s_addr = inet_addr("239.255.61.255");
imr.imr_interface.s_addr = interface;
if(!IN_MULTICAST(htonl(imr.imr_multiaddr.s_addr))){
printf("Group not in multicast.");
return 1;
}
if(setsockopt(fd,IPPROTO_IP,IP_ADD_MEMBERSHIP, (char*)&imr, sizeof(imr)) < 0){
printf("Failed to add membership, errno: %d.\n",errno);
return 1;
}
int epollInstance = epoll_create1(0);
struct epoll_event* epollEvents = (struct epoll_event*)malloc(sizeof(struct epoll_event)*8192);
epollEvents[0].events = EPOLLIN;
epoll_ctl(epollInstance,EPOLL_CTL_ADD,fd,&epollEvents[0]);
const int PACKETS_TO_READ = 1024;
static char receiveBuffer[PACKETS_TO_READ][USHRT_MAX];
static struct iovec iovecs[PACKETS_TO_READ];
static struct mmsghdr msgs[PACKETS_TO_READ];
static struct sockaddr_in sockFrom[PACKETS_TO_READ];
for (int i = 0; i < PACKETS_TO_READ; i++) {
iovecs[i].iov_base = receiveBuffer[i];
iovecs[i].iov_len = USHRT_MAX;
msgs[i].msg_hdr.msg_iov = &iovecs[i];
msgs[i].msg_hdr.msg_iovlen = 1;
msgs[i].msg_hdr.msg_name = &sockFrom[i];
msgs[i].msg_hdr.msg_namelen = sizeof(struct sockaddr_in );
}
struct timeval start;
struct timeval end;
while(1){
int selected = epoll_wait(epollInstance,epollEvents,8192,10);
if(selected > 0){
gettimeofday(&start,NULL);
int numPackets = recvmmsg(fd,msgs,PACKETS_TO_READ,MSG_DONTWAIT,0);
gettimeofday(&end,NULL);
printf("Got %d packets in %lu microseconds\n",numPackets, (end.tv_sec - start.tv_sec) * 1000000 + end.tv_usec - start.tv_usec);
}
}
}