3

I need to create a software that receives concurrently from web socket and pipe and it sends the messages on the other channel (it receives from the socket, creates a new thread and sends to the pipe. In the same way it receives from the pipe, creates a new thread and sends to the socket).

I have a problem with multithreading, at the boot of the program I have to start the methods socket_receiver and pipe_receiver but I can only start the pipe_receiver. I tried removing all the code and keep only socket_receiver and pipe_receiver but it only enters in the while True of the pipe_receiver.

import asyncio
import sys
import json
from concurrent.futures.thread import ThreadPoolExecutor
import websockets

# make the Pool of workers
executor = ThreadPoolExecutor(max_workers=10)
# Make connection to socket and pipe
header = {"Authorization": r"Basic XXXX="}
connection = websockets.connect('wss://XXXXXXXX', extra_headers=header)


async def socket_receiver():
    """Listening from web socket"""
    async with connection as web_socket:
        while True:
            message = await web_socket.recv()
            # send the message to the pipe in a new thread
            executor.submit(send_to_pipe(message))


async def pipe_receiver():
    """Listening from pipe"""
    while True:
        message = sys.stdin.readline()
        if not message:
            break
        executor.submit(send_to_socket(message))
        # jsonValue = json.dump(str(line), file);
        sys.stdout.flush()


def send_to_pipe(message):
    # Check if message is CAM or DENM
    json_message = json.loads(message)
    type = int(json_message["header"]["messageID"])
    # 1 is DENM message, 2 is CAM message
    if type == 1  or type == 2:
        # send the message to the pipe
        sys.stdout.print(json_message);


async def send_to_socket(message):
     async with connection as web_socket:
        json_message = json.dumps(message)
        await web_socket.send(json_message)


asyncio.get_event_loop().run_until_complete(
    asyncio.gather(socket_receiver(),pipe_receiver()))

This program is called by a subprocess, the parent process communicates with it through pipes connected to stdout and stdin.

UPDATE: I receive this exception with @Martijn Pieters code

Traceback (most recent call last):
  File "X", line 121, in <module>
    main()
  File "X", line 119, in main
    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
  File "X\AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 568, in run_until_complete
    return future.result()
  File "X", line 92, in connect_pipe
    reader, writer = await stdio()
  File "X", line 53, in stdio
    lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
  File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 1421, in connect_read_pipe
    transport = self._make_read_pipe_transport(pipe, protocol, waiter)
  File "X/AppData\Local\Programs\Python\Python37-32\lib\asyncio\base_events.py", line 433, in _make_read_pipe_transport
    raise NotImplementedError
NotImplementedError
luca
  • 3,248
  • 10
  • 66
  • 145
  • Shouldn't the `websocket.connect()` call be *inside the `socket_receiver()` coroutine`*? – Martijn Pieters Sep 07 '18 at 09:40
  • The [project documentation](https://github.com/aaugustin/websockets) certainly seems to suggest so. – Martijn Pieters Sep 07 '18 at 09:41
  • See the [*Both* pattern in the intro section of the `websockets` documentation](https://websockets.readthedocs.io/en/stable/intro.html#both) on how to create a websocket connection that you both send and receive on. – Martijn Pieters Sep 07 '18 at 09:45
  • I used inside `socket_receiver` but the problem is the same, furthermore I even have to send the message so I should open the socket outside – luca Sep 07 '18 at 10:07
  • `sys.stdout.print` is not a function, and you are currently 'printing' raw Python objects (wouldn't than need to be re-encoded to JSON?). You can't just use `sys.stdout` and `sys.stdin` in non-blocking fashion either. – Martijn Pieters Sep 07 '18 at 10:13
  • I read the doc, but my problem is that socket_receiver doesn't start, only pipe_receiver starts. There is some problem with the use of gather. After resolved this problem I can think about the both consumer that elaborate the message – luca Sep 07 '18 at 10:27
  • Both *start*, but your socket receiver can't do anything with that socket, it'll have been closed by the `async with` context in `send_to_socket()`. – Martijn Pieters Sep 07 '18 at 10:35
  • Also, `Executor.submit()` expects a callable, *not a coroutine*, so `executor.submit(send_to_socket(message))` simply fails as calling a coroutine can't work. – Martijn Pieters Sep 07 '18 at 10:41

1 Answers1

6

You are not using the ThreadPoolExecutor correctly, and you really don't want to use that here. Instead, you need to set up consumers and producers to handle your socket and pipe with queues to send messages between them.

  • for each connection type, create a coroutine that creates the connection, then passes that single connection to both a consumer and producer tasks (created with asyncio.create_task()) for that connection. Use asyncio.wait() to run both tasks with return_when=asyncio.FIRST_COMPLETED, so you can cancel any that are still running when one of the two completes 'early' (e.g. has failed).

  • Use a queue to pass messages from the consumer of one, to the producer of the other connection.

  • sys.stdin and sys.stdout are blocking streams, don't just read and write to them! See https://gist.github.com/nathan-hoad/8966377 for a gist attempting to set up non-blocking STDIO streams, and this asyncio issue that asks for a non-blocking streams feature.

  • Don't use a global socket connection, certainly not with two separate async with statements. Your send_to_socket() method would actually close the socket because the async with connection as web_socket: context manager exits when the first message is sent, and this then causes issues for the socket_receiver code which assumes the socket remains open indefinitely.

  • Don't use threading here! Your connections are entirely managed by asyncio, threading would stomp majorly on this.

  • asyncio.Executor() instances should only be used with regular callables, not with coroutines. Executor.submit() states it takes a callable, passing in a coroutine with executor.submit(send_to_pipe(message)) or executor.submit(send_to_socket(message)) will cause an exception to be raised as coroutines are not callables. You are probably not seeing an exception message as that exception is raised in the other thread.

    This is the reason your socket_receiver() coroutine fails; it certainly starts but attempts to send messages fail. When I run your code against a local mocked-up websocket server a warning is printed:

    RuntimeWarning: coroutine 'send_to_socket' was never awaited
      executor.submit(send_to_socket(message))
    

    When a coroutine is not awaited, the code in that coroutine is never executed. Wrapping the coroutine in one that prints out the exception to stderr (try: callable(), except Exception: traceback.print_exc(file=sys.stderr))) you get:

    Traceback (most recent call last):
      File "soq52219672.py", line 15, in log_exception
        callable()
    TypeError: 'coroutine' object is not callable
    

Executors should only be used to integrate code that can't be converted to using coroutines; the executor manages that code to run parallel to the asyncio tasks without interference. Care should be taken if that code wanted to interact with asyncio tasks, always use asyncio.run_coroutine_threadsafe() or asyncio.call_soon_threadsafe() to call across the boundary. See the Concurrency and multithreading section.

Here is an example of how I'd rewrite your code to use the consumer/producer pattern, with stdio() based on the Nathan Hoad gist on the subject, plus a fallback for Windows where support for treating stdio as pipes is limited:

import asyncio
import json
import os
import sys

import websockets

async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)

async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        jsonmessage = json.dumps(message)
        await socket.send(jsonmessage)

async def connect_socket(incoming, outgoing):
    header = {"Authorization": r"Basic XXXX="}
    uri = 'wss://XXXXXXXX'
    async with websockets.connect(uri, extra_headers=header) as websocket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.create_task(socket_consumer(websocket, outgoing))
        producer_task = asyncio.create_task(socket_producer(websocket, incoming))

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED
        )
        for task in pending:
            task.cancel()
        # force a result check; if there was an exception it'll be re-raised
        for task in done:
            task.result()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_running_loop()

    if sys.platform == 'win32':
        # no support for asyncio stdio yet on Windows, see https://bugs.python.org/issue26832
        # use an executor to read from stdio and write to stdout
        class Win32StdinReader:
            def __init__(self):
                self.stdin = sys.stdin.buffer 
            async def readline():
                # a single call to sys.stdin.readline() is thread-safe
                return await loop.run_in_executor(None, self.stdin.readline)

        class Win32StdoutWriter:
            def __init__(self):
                self.buffer = []
                self.stdout = sys.stdout.buffer
            def write(self, data):
                self.buffer.append(data)
            async def drain(self):
                data, self.buffer = self.buffer, []
                # a single call to sys.stdout.writelines() is thread-safe
                return await loop.run_in_executor(None, sys.stdout.writelines, data)

        return Win32StdinReader(), Win32StdoutWriter()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader),
        sys.stdin
    )

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin,
        os.fdopen(sys.stdout.fileno(), 'wb')
    )
    writer = asyncio.streams.StreamWriter(writer_transport, writer_protocol, None, loop)

    return reader, writer

async def pipe_consumer(pipereader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipereader.readline()
        if not message:
            break
        await outgoing.put(message.decode('utf8'))

async def pipe_producer(pipewriter, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        jsonmessage = await incoming.get()
        message = json.loads(jsonmessage)
        type = int(message.get('header', {}).get('messageID', -1))
        # 1 is DENM message, 2 is CAM message
        if type in {1, 2}:
            pipewriter.write(jsonmessage.encode('utf8') + b'\n')
            await pipewriter.drain()

async def connect_pipe(incoming, outgoing):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.create_task(pipe_consumer(reader, outgoing))
    producer_task = asyncio.create_task(pipe_producer(writer, incoming))

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task],
        return_when=asyncio.FIRST_COMPLETED
    )
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()

async def main():
    pipe_to_socket = asyncio.Queue()
    socket_to_pipe = asyncio.Queue()

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket)

    await asyncio.gather(socket_coro, pipe_coro)

if __name__ == '__main__':
    asyncio.run(main())

This then starts with two tasks, one to manage the socket, the other to manage the STDIO pipe. Both each start 2 more tasks, for their consumer and producer. There are two queues to send the messages from the consumer of one and to the producer of the other.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • I fixed with new ubuntu virtual machine, the only method that doesn't work is pipe_producer, the socket_consumer receives the json but pipe_producer kepp on await incoming.get() – luca Sep 12 '18 at 07:01
  • @luca is the patent process using buffering when writing to the pipe? Make sure to flush! – Martijn Pieters Sep 12 '18 at 07:20
  • the object is in the queue by `socket_consumer` (received from the socket) but the `pipe_producer` doesn't go ahead from `incoming.get()`, I don't think is the parent process the problem – luca Sep 12 '18 at 08:18
  • @luca I can’t reproduce this. The queues are well tested. Are you certain you are not missing a get for a message with an invalid header? I didn’t include JSON parsing error handling either, it may be prudent to add a try...except there and log issues. – Martijn Pieters Sep 12 '18 at 08:39
  • I open a new question only about this problem [here](https://stackoverflow.com/questions/52291218/python-asyncio-queue-get-doesnt-receive-the-message) – luca Sep 12 '18 at 08:55