2

We all know that using asyncio substantially improves the performance of a socket server, and obviously things get even more awesome if we could take advantage of all cores in our cpu (maybe via multiprocessing module or os.fork() etc.)

I'm now trying to build a multicore socket server demo, with a asynchronous socket server listening on each core and all binding to one port. Simply by creating a async server and then use os.fork(), let processes work competitively.

However the single-core-fine code runs into some trouble when I'm trying to fork. Seems like there's some problem with registering same filedescriptors from different processes in epoll selector module.

I'm showing some code below, can anyone help me out?


Here's a simple, logically clear code of echo server using asyncio:

import os
import asyncio #,uvloop
from socket import *

# hendler sends back incoming message directly
async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
async def create_server(loop):
    sock = socket(AF_INET ,SOCK_STREAM)
    sock.setsockopt(SOL_SOCKET , SO_REUSEADDR ,1)
    sock.bind(('',25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request, create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client ,addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop ,client))

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))
loop.create_task(serving(loop, sock))
loop.run_forever()

It works fine until I'm trying to fork, after the socket was bounl and before server starts serving. (This logic works fine in synchronous -- threading based code.)


When I'm trying this:

loop = asyncio.get_event_loop()
sock = loop.run_until_complete(create_server(loop))

from multiprocessing import cpu_count
for num in range(cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop.create_task(serving(loop, sock))
loop.run_forever()

Theoretically forked process are bounl to a same socket? And run in a same event loop? then work just fine?

However I'm getting these error messages:

Task exception was never retrieved
future: <Task finished coro=<serving() done, defined at /home/new/LinuxDemo/temp1.py:21> exception=FileExistsError(17, 'File exists')>
Traceback (most recent call last):
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 262, in _add_reader
    key = self._selector.get_key(fd)
  File "/usr/local/lib/python3.7/selectors.py", line 192, in get_key
    raise KeyError("{!r} is not registered".format(fileobj)) from None
KeyError: '6 is not registered'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/test/temp1.py", line 23, in serving
    client ,addr = await loop.sock_accept(sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 525, in sock_accept
    self._sock_accept(fut, False, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 538, in _sock_accept
    self.add_reader(fd, self._sock_accept, fut, True, sock)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 335, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/usr/local/lib/python3.7/asyncio/selector_events.py", line 265, in _add_reader
    (handle, None))
  File "/usr/local/lib/python3.7/selectors.py", line 359, in register
    self._selector.register(key.fd, poller_events)
FileExistsError: [Errno 17] File exists

Python version 3.7.3,

I'm totally confused about what's going on.

Could anybody help? thanks

tripleee
  • 175,061
  • 34
  • 275
  • 318
AdamHommer
  • 657
  • 7
  • 22
  • 1
    this is a known issue of `event loop`: https://bugs.python.org/issue21998 – georgexsh Jun 01 '19 at 19:44
  • Thanks ,seems like things going wrong cause i'm sharing eventloop between process .So if there's any other method we can benefit from multi core to build a socket server in Python? – AdamHommer Jun 01 '19 at 19:57

1 Answers1

1

According to the tracker issue, it is not supported to fork an existing asyncio event loop and attempt to use it from multiple processes. However, according to Yury's comment on the same issue, multi-processing can be implemented by forking before starting a loop, therefore running fully independent asyncio loops in each child.

Your code actually confirms this possibility: while create_server is async def, it doesn't await anything, nor does it use the loop argument. So we can implement Yury's approach by by making create_server a regular function, removing the loop argument, and calling it before os.fork(), and only running event loops after forking:

import os, asyncio, socket, multiprocessing

async def handler(loop, client):
    with client:
        while True:
            data = await loop.sock_recv(client, 64)
            if not data:
                break
            await loop.sock_sendall(client, data)

# create tcp server
def create_server():
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind(('', 25000))
    sock.listen()
    sock.setblocking(False)
    return sock

# whenever accept a request ,create a handler task in eventloop
async def serving(loop, sock):
    while True:
        client, addr = await loop.sock_accept(sock)
        loop.create_task(handler(loop, client))

sock = create_server()

for num in range(multiprocessing.cpu_count() - 1):
    pid = os.fork()
    if pid <= 0:            # fork process as the same number as 
        break               # my cpu cores

loop = asyncio.get_event_loop()
loop.create_task(serving(loop, sock))
loop.run_forever()
user4815162342
  • 141,790
  • 18
  • 296
  • 355
  • 1
    Thanks,perfectly solved. Since pypy 7.1.1 fully supports asyncio ,I got a benchmark of about 800k echo per second from my eight core platform ,Fabulous. But regretfully pypy gets some trouble switching eventloop into uvloop policy – AdamHommer Jun 02 '19 at 09:17
  • Don't know if it's caused by I'm doing something wrong. – AdamHommer Jun 02 '19 at 09:18
  • @LeeRoermond It's hard to tell how debugged the asyncio/uvloop combination is. But to be on the safe side with what you're doing, I'd move even the *import* of `asyncio` (and `uvloop` and anything gtk-related) until after the call to `fork()`. In other words, not only should the parent should not run the event loop, but it shouldn't even import asyncio/uvloop. – user4815162342 Jun 02 '19 at 17:18
  • thanks for reply. I've tested latest version of uvloop on pypy (installed via pip),and seems like it couldn't even drive a single process echo server. Thus it's not some namespace or import problem as far as i'm concerned. Building from source was also failed cause by some libev issue. pity ╮(╯_╰)╭ – AdamHommer Jun 03 '19 at 08:13
  • @LeeRoermond `uvloop` uses Cython and C extensions heavily, which, while nominally supported by PyPy, tends not to work well in practice, and hinders the JIT's ability to reason about the code. When using PyPy, I'd definitely stay with the built-in asyncio. – user4815162342 Jun 03 '19 at 08:54