1

I post a new question related the old for a problem with the get from queue. This is the code (thanks to Martijn Pieters)

import asyncio
import sys
import json
import os
import websockets


async def socket_consumer(socket, outgoing):
    # take messages from the web socket and push them into the queue
    async for message in socket:
        await outgoing.put(message)
        file = open(r"/home/host/Desktop/FromSocket.txt", "a")
        file.write("From socket: " + ascii(message) + "\n")
        file.close()


async def socket_producer(socket, incoming):
    # take messages from the queue and send them to the socket
    while True:
        message = await incoming.get()
        file = open(r"/home/host/Desktop/ToSocket.txt", "a")
        file.write("To socket: " + ascii(message) + "\n")
        file.close()
        await socket.send(message)


async def connect_socket(incoming, outgoing, loop=None):
    header = {"Authorization": r"Basic XXX="}
    uri = 'XXXXXX'
    async with websockets.connect(uri, extra_headers=header) as web_socket:
        # create tasks for the consumer and producer. The asyncio loop will
        # manage these independently
        consumer_task = asyncio.ensure_future(
            socket_consumer(web_socket, outgoing), loop=loop)
        producer_task = asyncio.ensure_future(
            socket_producer(web_socket, incoming), loop=loop)

        # start both tasks, but have the loop return to us when one of them
        # has ended. We can then cancel the remainder
        done, pending = await asyncio.wait(
            [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
        for task in pending:
            task.cancel()


# pipe support
async def stdio(loop=None):
    if loop is None:
        loop = asyncio.get_event_loop()

    reader = asyncio.StreamReader()
    await loop.connect_read_pipe(
        lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)

    writer_transport, writer_protocol = await loop.connect_write_pipe(
        asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
    writer = asyncio.streams.StreamWriter(
        writer_transport, writer_protocol, None, loop)

    return reader, writer


async def pipe_consumer(pipe_reader, outgoing):
    # take messages from the pipe and push them into the queue
    while True:
        message = await pipe_reader.readline()
        if not message:
            break
        file = open(r"/home/host/Desktop/FromPipe.txt", "a")
        file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
        file.close()

        await outgoing.put(message.decode('utf8'))


async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        file = open(r"/home/host/Desktop/ToPipe.txt", "a")
        file.write("Send to pipe message: " + ascii(json_message) + "\n")
        file.close()
        try:
            message = json.loads(json_message)
            message_type = int(message.get('header', {}).get('messageID', -1))

        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            message_type = None
            file = open(r"/home/host/Desktop/Error.txt", "a")
            file.write(" Error \n")
            file.close()
        # 1 is DENM message, 2 is CAM message
        file.write("Send to pipe type: " + type)
        if message_type in {1, 2}:
            file.write("Send to pipe: " + json_message)
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()


async def connect_pipe(incoming, outgoing, loop=None):
    reader, writer = await stdio()
    # create tasks for the consumer and producer. The asyncio loop will
    # manage these independently
    consumer_task = asyncio.ensure_future(
        pipe_consumer(reader, outgoing), loop=loop)
    producer_task = asyncio.ensure_future(
        pipe_producer(writer, incoming), loop=loop)

    # start both tasks, but have the loop return to us when one of them
    # has ended. We can then cancel the remainder
    done, pending = await asyncio.wait(
        [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
    for task in pending:
        task.cancel()
    # force a result check; if there was an exception it'll be re-raised
    for task in done:
        task.result()


def main():
    loop = asyncio.get_event_loop()
    pipe_to_socket = asyncio.Queue(loop=loop)
    socket_to_pipe = asyncio.Queue(loop=loop)

    socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
    pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)

    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))

main()

This code is the child process called from the parent by

subprocess.Popen(["python3", test], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

The problem is the object is in the queue by socket_consumer (received from the socket) but the pipe_producer doesn't go ahead from incoming.get(). The file writing is only for testing purpose.

The parent at the moment is this (only for test)

test = r"/home/host/PycharmProjects/Tim/Tim.py"
process = subprocess.Popen(["python3", test],
                           stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
    jsonValueBytes = message.encode("utf-8")
    process.stdin.write(jsonValueBytes + b"\n")

process.stdin.close()
process.wait()

Instead to send to web socket I'm using this code:

#!/usr/bin/env python

import asyncio
import websockets

async def hello(uri):
    header = {"Authorization": r"Basic XXXX="}
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5,"referencePosition":{"latitude":451114425,"longitude":76720957,"positionConfidenceEllipse":{"semiMajorConfidence":4095,"semiMinorConfidence":4095,"semiMajorOrientation":3601},...other fields}}';
    async with websockets.connect(uri, extra_headers=header) as websocket:
        await websocket.send(message)


asyncio.get_event_loop().run_until_complete(
    hello('XXX'))

It send through pipe and works because I receive on pipe and send to the socket (the files FromPipe.txt. and ToSocket.txt are right).
Then I have code to send to a server with an opened web socket and this server send the message to the child. When the child receive from the socket the file FromSocket.txt is created but the ToPipe.txt in not created until I put it before the awit incoming.get()

FromSocket.txt has this content:

From socket: '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1, ... other field}}'

But if there was a problem on type retrieving than it would create the file since it is the first instruction after the json_message = await incoming.get() I think is a problem with the queue. For test I put the incoming.get() in socket_consumer after the await outgoing.put(message) and it works.

UPDATE: If I run only the child (so without pipe) the ToPipe.txt is right and the message transfert from socket to pipe is fine. For my test I run the parent, it sends to pipe one message that the child sends to the socket, then I send a message to the socket and the child catchs this message but it doesn't send to the pipe and the ToPipe.txt is not created. Maybe there is a problem in the main method

luca
  • 3,248
  • 10
  • 66
  • 145
  • How are you reading and writing from and to the parent process Popen? Are you using `select` or multiple threads? Can you try with [`pexpect`](https://pexpect.readthedocs.io/en/stable/) instead? It would make it much easier to ensure that it's not the parent process that's the issue here, as [`pexpect` handles all the non-blocking reading and writing details for you](https://github.com/pexpect/pexpect/blob/master/pexpect/popen_spawn.py). – Martijn Pieters Sep 12 '18 at 09:41
  • Next, we'll need more detail on the actual messages being exchanged here, to make sure there isn't a problem with the way `pipe_producer` parses the messages. can you show the `ToPipe.txt` output you get together with the data the parent process and the websocket are sending and receiving? – Martijn Pieters Sep 12 '18 at 09:44
  • I added futher details – luca Sep 12 '18 at 10:02
  • You never read from the pipe stdin. This can cause the whole I/O layer to stall entirely, and your writes *never make it to the child process*. Please, do use `pexpect` here to avoid problems like those. – Martijn Pieters Sep 12 '18 at 10:50
  • I've outlined in my answer below what else is going wrong. At issue is that you never specified what kind of data the pipe and the websocket send and receive; I presumed you had those parts already figured out. – Martijn Pieters Sep 12 '18 at 15:33

1 Answers1

2

You are writing double-encoded JSON to the child process:

message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
jsonValue = json.dumps(message)

message is already a JSON string, so jsonValue is a double-encoded JSON string.

The pipe consumer pushes this double-encoded string into the queue for the socket. Next, the websocket producer in socket_producer() encodes the message again:

while True:
    message = await incoming.get()
    # ...
    json_message = json.dumps(message)
    await socket.send(json_message)

So now json_message is a triple-encoded JSON value, a JSON document containing a JSON document containing a JSON document:

>>> import json
>>> message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'  # valid JSON
>>> json_message = json.dumps(message)
>>> print(json_message)  # double-encoded
"{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"
>>> json_message = json.dumps(json_message)  # encode *again*
>>> print(json_message)  # triple-encoded
"\"{\\\"header\\\":{\\\"protocolVersion\\\":1,\\\"messageID\\\":2,\\\"stationID\\\":400}}}\""

I don't know exactly what your web socket does with this, but lets assume that it uses json.loads() once, then echoes the decoded message back. This means that socket_consumer() receives the a JSON document that's encoded just twice. Your FromSocket.txt log certainly implies that this is what happens, because it contains a double encoded JSON message:

You can see this in your FromSocket.txt log:

From socket: "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400},\"cam\":{\"generationDeltaTime\":1,...other fields}}"

Note those \" entries, and the whole document is wrapped in quotes, but there are no \\\ triple-backslashes in the value.

Still, this extra layering of JSON encoding breaks the pipe_producer() coroutine, which expects the message to decode to a dictionary, not another string (even if that string contains another JSON document):

message = json.loads(json_message)
type = int(message.get('header', {}).get('messageID', -1))

message will decode to a string instead, so message.get will fail with an AttributeError, causing the coroutine to exit:

>>> json_message = "{\"header\":{\"protocolVersion\":1,\"messageID\":2,\"stationID\":400}}}"  # double encoded
>>> message = json.loads(json_message)
>>> message  # Back one stop, single-encoded JSON
'{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}}'
>>> type(message)  # it's a string with JSON, not a dictionary
<class 'str'>
>>> message.get('header')
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
AttributeError: 'str' object has no attribute 'get'

You need to make sure you do not encode your data too many times! If your pipe receives JSON data, don't encode the data again when sending it to the socket. When sending data to the pipe from the parent process, don't double encode the data, if you already have a JSON string there is no value in passing it through json.dumps() once more.

It would also be prudent to add more fail-safes in the coroutines. I didn't make the JSON decoding robust enough, so lets remedy that part:

async def pipe_producer(pipe_writer, incoming):
    # take messages from the queue and send them to the pipe
    while True:
        json_message = await incoming.get()
        try:
            message = json.loads(json_message)
            type = int(message.get('header', {}).get('messageID', -1))
        except (ValueError, TypeError, AttributeError):
            # failed to decode the message, or the message was not
            # a dictionary, or the messageID was convertable to an integer
            type = None
        # 1 is DENM message, 2 is CAM message
        if type in {1, 2}:
            pipe_writer.write(json_message.encode('utf8') + b'\n')
            await pipe_writer.drain()

You probably want to record that the decoding failed somewhere (push messages to a log queue that a separate task picks up to write to a log).

Next, we can update the connect_* functions to not ignore exceptions in the tasks that complete:

done, pending = await asyncio.wait(
    [consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
    task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
    task.result()

The done.result() check can re-raise the exception thrown in a consumer or producer. Since the connect_* coroutines are run via asyncio.gather(), which in turn is run by loop.run_until_complete(), that exception is then propagated all the way to the main() function, so it'll exit Python and you get to see the traceback printed. I've updated my other answer to include the for task in done: task.result() loop as that's good practice anyway.

With just the task.result() loop in my original answer code, and a websocket that just echoes the message back, and entering a valid JSON document (not double-encoded), I can see the error immediately; the parent process here is my terminal, so I'm just copying in the JSON message into my terminal window to send data into the pipe:

$ python3.7 so52291218.py
{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
Traceback (most recent call last):
  File "so52291218.py", line 140, in <module>
    main()
  File "so52291218.py", line 137, in main
    loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
  File "/.../lib/python3.7/asyncio/base_events.py", line 568, in run_until_complete
    return future.result()
  File "so52291218.py", line 126, in connect_pipe
    task.result()
  File "so52291218.py", line 104, in pipe_producer
    type = int(message.get("header", {}).get("messageID", -1))
AttributeError: 'str' object has no attribute 'get'

When I remove the json.dumps() call from the socket_producer() or I change my websocket server to use json.loads() on the incoming message and sending that out as the result, then the code works and I get the same message echoed back to my terminal.

Note that you can't just use a loop to write to a subprocess.Popen() pipe when both stdin and stdout are pipes. You can trivially cause the childprocess to hang on I/O by only writing in a loop. You'd have to make sure to read from the stdout pipe too, but because the child process is going to read and write from those handles in effectively random order, your parent process would also have to handle the I/O for the Popen() pipes asynchronously.

Rather than write up how to do that (which is already covered elsewhere on Stack Overflow), I'm instead telling you to use the pexpect project, as it already has done all that work for you (by spawning a separate thread that continually reads from the stdout pipe); using pexpect.popen_spawn.PopenSpawn() to keep this close to your original setup would look like this:

import sys
import pexpect

test = '...'
process = pexpect.popen_spawn.PopenSpawn([sys.executable, test])

for i in range(5):
    message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}}';
    jsonValueBytes = message.encode("utf-8")
    process.send(jsonValueBytes + b"\n")

    # echo anything coming back
    while True:
        index = process.expect([process.crlf, pexpect.EOF, pexpect.TIMEOUT], timeout=0.1)
        if not process.before:
            break
        print('>>>', process.before.decode('utf8', errors='replace'), flush=True)

# send EOF to close the pipe, then terminate the process
process.sendeof()
process.kill(1)
process.wait()

So every time we send a full line to the pipe, we also look for lines coming the other way, with a short timeout, and echo any such lines.

With all fixes in place (making sure to avoid multiple-encoding JSON messages), and a very simple echoing websocket server, the pexpect code above prints:

>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}
>>> {"header":{"protocolVersion":1,"messageID":2,"stationID":400}}

Showing that there is a full round-trip path from parent process to child process to websocket and back.

Martijn Pieters
  • 1,048,767
  • 296
  • 4,058
  • 3,343
  • Thanks for the explanation, I removed the json encoding duplication but it still doesn't work. Further the creation of ToPipe.txt file is before the retrieving of type field but it doesn't even create. – luca Sep 13 '18 at 07:27
  • @luca have you also added the `for task in done: task.result()` loop? Because what then probably goes wrong is the decoding in pipe_consumer. – Martijn Pieters Sep 13 '18 at 07:31
  • Do us a favour and use `ascii()` on objects you write to your log files. Share the results of all your log files. I’m still sailing blind here because you don’t share the data sent or received at all four points and only `ascii()` can guarantee we’ll get accurate representations of those objects. – Martijn Pieters Sep 13 '18 at 07:34
  • Your question is getting muddled, and it’s not clear to me what code you are running and what messages are generated. Please edit the question without update sections; imagine it being read by future visitors, they don’t need the whole history. I can always read the post history to se what changed. The goal here is to create a question with W proper [mcve], something that’ll let me recreate the whole issue. – Martijn Pieters Sep 13 '18 at 08:33
  • Code edited with the actual code, It seems to stuck on `json_message = await incoming.get()`. I can't give access to web socket but it returns the same received json – luca Sep 13 '18 at 08:49
  • @luca: I use https://gist.github.com/eafc07d718ce38cdcd178bc4c4e93fae to test with, so `uri` in the answer code is set to `uri = "ws://localhost:8765"`. Your edit didn't include the log file messages. Try to read through your post and in a separate environment with *just your question information* try to re-create your situation. Don't copy anything from anywhere else, unless you also add that extra info to your question. I can't reproduce your problem, and I must assume the problem lies elsewhere. – Martijn Pieters Sep 13 '18 at 08:58
  • I update the main post, maybe there is a problem with asyncio in main method – luca Sep 13 '18 at 10:37
  • 1
    Please follow my advise and switch to using pexpect; you have a hung I/O layer, I think. – Martijn Pieters Sep 13 '18 at 10:58
  • with `process = pexpect.spawn("python3 " + test)` and `process.write(message + "\n")` it works – luca Sep 13 '18 at 12:09