I post a new question related the old for a problem with the get from queue. This is the code (thanks to Martijn Pieters)
import asyncio
import sys
import json
import os
import websockets
async def socket_consumer(socket, outgoing):
# take messages from the web socket and push them into the queue
async for message in socket:
await outgoing.put(message)
file = open(r"/home/host/Desktop/FromSocket.txt", "a")
file.write("From socket: " + ascii(message) + "\n")
file.close()
async def socket_producer(socket, incoming):
# take messages from the queue and send them to the socket
while True:
message = await incoming.get()
file = open(r"/home/host/Desktop/ToSocket.txt", "a")
file.write("To socket: " + ascii(message) + "\n")
file.close()
await socket.send(message)
async def connect_socket(incoming, outgoing, loop=None):
header = {"Authorization": r"Basic XXX="}
uri = 'XXXXXX'
async with websockets.connect(uri, extra_headers=header) as web_socket:
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.ensure_future(
socket_consumer(web_socket, outgoing), loop=loop)
producer_task = asyncio.ensure_future(
socket_producer(web_socket, incoming), loop=loop)
# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
# pipe support
async def stdio(loop=None):
if loop is None:
loop = asyncio.get_event_loop()
reader = asyncio.StreamReader()
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader), sys.stdin)
writer_transport, writer_protocol = await loop.connect_write_pipe(
asyncio.streams.FlowControlMixin, os.fdopen(sys.stdout.fileno(), 'wb'))
writer = asyncio.streams.StreamWriter(
writer_transport, writer_protocol, None, loop)
return reader, writer
async def pipe_consumer(pipe_reader, outgoing):
# take messages from the pipe and push them into the queue
while True:
message = await pipe_reader.readline()
if not message:
break
file = open(r"/home/host/Desktop/FromPipe.txt", "a")
file.write("From pipe: " + ascii(message.decode('utf8')) + "\n")
file.close()
await outgoing.put(message.decode('utf8'))
async def pipe_producer(pipe_writer, incoming):
# take messages from the queue and send them to the pipe
while True:
json_message = await incoming.get()
file = open(r"/home/host/Desktop/ToPipe.txt", "a")
file.write("Send to pipe message: " + ascii(json_message) + "\n")
file.close()
try:
message = json.loads(json_message)
message_type = int(message.get('header', {}).get('messageID', -1))
except (ValueError, TypeError, AttributeError):
# failed to decode the message, or the message was not
# a dictionary, or the messageID was convertable to an integer
message_type = None
file = open(r"/home/host/Desktop/Error.txt", "a")
file.write(" Error \n")
file.close()
# 1 is DENM message, 2 is CAM message
file.write("Send to pipe type: " + type)
if message_type in {1, 2}:
file.write("Send to pipe: " + json_message)
pipe_writer.write(json_message.encode('utf8') + b'\n')
await pipe_writer.drain()
async def connect_pipe(incoming, outgoing, loop=None):
reader, writer = await stdio()
# create tasks for the consumer and producer. The asyncio loop will
# manage these independently
consumer_task = asyncio.ensure_future(
pipe_consumer(reader, outgoing), loop=loop)
producer_task = asyncio.ensure_future(
pipe_producer(writer, incoming), loop=loop)
# start both tasks, but have the loop return to us when one of them
# has ended. We can then cancel the remainder
done, pending = await asyncio.wait(
[consumer_task, producer_task], return_when=asyncio.FIRST_COMPLETED)
for task in pending:
task.cancel()
# force a result check; if there was an exception it'll be re-raised
for task in done:
task.result()
def main():
loop = asyncio.get_event_loop()
pipe_to_socket = asyncio.Queue(loop=loop)
socket_to_pipe = asyncio.Queue(loop=loop)
socket_coro = connect_socket(pipe_to_socket, socket_to_pipe, loop=loop)
pipe_coro = connect_pipe(socket_to_pipe, pipe_to_socket, loop=loop)
loop.run_until_complete(asyncio.gather(socket_coro, pipe_coro))
main()
This code is the child process called from the parent by
subprocess.Popen(["python3", test], stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)
The problem is the object is in the queue by socket_consumer
(received from the socket) but the pipe_producer
doesn't go ahead from incoming.get()
.
The file writing is only for testing purpose.
The parent at the moment is this (only for test)
test = r"/home/host/PycharmProjects/Tim/Tim.py"
process = subprocess.Popen(["python3", test],
stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=2048)
for i in range(5):
message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400}, the rest of json...}}';
jsonValueBytes = message.encode("utf-8")
process.stdin.write(jsonValueBytes + b"\n")
process.stdin.close()
process.wait()
Instead to send to web socket I'm using this code:
#!/usr/bin/env python
import asyncio
import websockets
async def hello(uri):
header = {"Authorization": r"Basic XXXX="}
message = '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1,"camParameters":{"basicContainer":{"stationType":5,"referencePosition":{"latitude":451114425,"longitude":76720957,"positionConfidenceEllipse":{"semiMajorConfidence":4095,"semiMinorConfidence":4095,"semiMajorOrientation":3601},...other fields}}';
async with websockets.connect(uri, extra_headers=header) as websocket:
await websocket.send(message)
asyncio.get_event_loop().run_until_complete(
hello('XXX'))
It send through pipe and works because I receive on pipe and send to the socket (the files FromPipe.txt. and ToSocket.txt are right).
Then I have code to send to a server with an opened web socket and this server send the message to the child. When the child receive from the socket the file FromSocket.txt is created but the ToPipe.txt in not created until I put it before the awit incoming.get()
FromSocket.txt
has this content:
From socket: '{"header":{"protocolVersion":1,"messageID":2,"stationID":400},"cam":{"generationDeltaTime":1, ... other field}}'
But if there was a problem on type retrieving than it would create the file since it is the first instruction after the json_message = await incoming.get()
I think is a problem with the queue.
For test I put the incoming.get()
in socket_consumer after the await outgoing.put(message)
and it works.
UPDATE: If I run only the child (so without pipe) the ToPipe.txt is right and the message transfert from socket to pipe is fine. For my test I run the parent, it sends to pipe one message that the child sends to the socket, then I send a message to the socket and the child catchs this message but it doesn't send to the pipe and the ToPipe.txt is not created. Maybe there is a problem in the main method