1

I'm implementing a producer-consumer program in a server implementation using sockets and asyncio. The problem is the async function sock_recv() does not seem to be working properly when used with a socket wrapped in an ssl connection. Following is the working code.

Server side

import asyncio
import random
import socket
import ssl

SERVER_ADDRESS = (HOST, PORT) = "127.0.0.1", 8881

async def producer(queue, client_connection, event_loop):
    while True:
        print("Waiting for sock_recv")
        await event_loop.sock_recv(client_connection, 4096)
        r = random.randint(1,101)
        print("Produced: %d" % r)
        await queue.put(r)
        await asyncio.sleep(0)

async def consumer(queue):
    while True:
        print("Wating for queue.get()")
        r = await queue.get()
        await asyncio.sleep(2)
        print("Consumed: %d" % r)

async def main():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(5)
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
    ssl_context.load_cert_chain(certfile="certificate.pem", keyfile="key.pem")
    client_connection, client_address = listen_socket.accept()
    # client_connection = ssl_context.wrap_socket(
    #     client_connection, server_side=True
    # )
    client_connection.setblocking(False)
    queue = asyncio.Queue()
    t1 = asyncio.create_task(producer(queue, client_connection, asyncio.get_event_loop()))
    t2 = asyncio.create_task(consumer(queue))
    await asyncio.wait([t1, t2])

event_loop = asyncio.get_event_loop()
asyncio.run(main())

Client side

import socket
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(("127.0.0.1", 8881))
s.sendall(b"Hello")

Output

Waiting for sock_recv
Waiting for queue.get()
Produced: 49
Waiting for sock_recv
Consumed: 49
Waiting for queue.get()

Here's the problem, When I uncomment the following part

# client_connection = ssl_context.wrap_socket(
#     client_connection, server_side=True
# )

It blocks on the sock_recv() function. With the uncommented code, I get the following output:

Output

Waiting for sock_recv
Waiting for queue.get()

Client Code

import socket
import ssl

s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock = ssl.wrap_socket(s)
sock.connect(("127.0.0.1", 8881))
sock.sendall(b"Hello")

Finally, when I shutdown the server with ctrl-c. I get the following output

^CTask exception was never retrieved
future: <Task finished coro=<producer() done, defined at asyncio_test.py:8> exception=SSLWantReadError(2, 'The operation did not complete (read) (_ssl.c:2488)')>
Traceback (most recent call last):
  File "asyncio_test.py", line 11, in producer
    await event_loop.sock_recv(client_connection, 4096)
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/selector_events.py", line 352, in sock_recv
    return await fut
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/selector_events.py", line 366, in _sock_recv
    data = sock.recv(n)
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/ssl.py", line 1037, in recv
    return self.read(buflen)
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/ssl.py", line 913, in read
    return self._sslobj.read(len)
ssl.SSLWantReadError: The operation did not complete (read) (_ssl.c:2488)
Traceback (most recent call last):
  File "asyncio_test.py", line 42, in <module>
    asyncio.run(main())
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 571, in run_until_complete
    self.run_forever()
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 539, in run_forever
    self._run_once()
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/asyncio/base_events.py", line 1739, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/coverfox/.pyenv/versions/3.7.3/lib/python3.7/selectors.py", line 468, in select
    fd_event_list = self._selector.poll(timeout, max_ev)
KeyboardInterrupt

Edit: I just found out that it works if I pass do_handshake_on_connect=False in the wrap_socket() function in the client code, but then ssl won't work.

Akshay Takkar
  • 500
  • 1
  • 7
  • 21

1 Answers1

1

So, it turns out that the async function sock_recv() does not support SSLSocket, since SSL needs to write to user-space buffer as described in this SO answer.
The way to work around this issue is to use the transports or streams in asyncio. Here, is the working version of the code in the above question.

import asyncio
import random
import socket
import ssl

SERVER_ADDRESS = (HOST, PORT) = "127.0.0.1", 8881

async def producer(reader, writer, queue):
    while True:
        print("Waiting for sock_recv")
        await reader.read(16)
        r = random.randint(1,101)
        print("Produced: %d" % r)
        await queue.put(r)
        await asyncio.sleep(0)

async def consumer(queue):
    while True:
        print("Wating for queue.get()")
        r = await queue.get()
        await asyncio.sleep(2)
        print("Consumed: %d" % r)

async def set_up_producer_consumer(reader, writer):
    queue = asyncio.Queue()
    t1 = asyncio.create_task(producer(reader, writer, queue))
    t2 = asyncio.create_task(consumer(queue))
    await asyncio.wait([t1, t2])

async def main():
    ssl_context = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
    ssl_context.load_cert_chain(certfile="certificate.pem", keyfile="key.pem")

    server = await asyncio.start_server(set_up_producer_consumer, HOST, PORT, family=socket.AF_INET, ssl=ssl_context, reuse_address=True)
    await server.wait_closed()

event_loop = asyncio.get_event_loop()
asyncio.run(main())
Akshay Takkar
  • 500
  • 1
  • 7
  • 21