4

I am trying to read some data streams using protobuf in python, and i want to use trio to make the client for reading the streams. The protobuf has some method calls, and i find they do not work when i use trio streams.

Python client on a linux machine.

import DTCProtocol_pb2 as Dtc

async def parent(addr, encoding, heartbeat_interval):
    print(f"parent: connecting to 127.0.0.1:{addr[1]}")
    client_stream = await trio.open_tcp_stream(addr[0], addr[1])

    # encoding request
    print("parent: spawing encoding request ...")
    enc_req = create_enc_req(encoding) # construct encoding request
    await send_message(enc_req, Dtc.ENCODING_REQUEST,client_stream, 'encoding request') # send encoding request

    log.debug('get_reponse: started')
    response = await client_stream.receive_some(1024)
    m_size = struct.unpack_from('<H', response[:2]) # the size of message
    m_type = struct.unpack_from('<H', response[2:4]) # the type of the message
    m_body = response[4:]
    m_resp = Dtc.EncodingResponse()

m_body would be some bytes data, which I dont know how to decode. Dtc.EncodingResponse() is the protobuf method which would give a Dtc object which contains the response in a readable format. (Dtc is the protobuf file). But I get nothing here. When I did this script without trio, Dtc.EncodingResponse() would give the full response in readable format.

I am guessing the problem is that the "client_stream" is a trio stream object that only reads bytes, and so I probably need to use a ReceiveChannel object instead. But if this is true, I dont know how to do this.

UPDATE: The answer below by Nathaniel J. Smith solves my problem.

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

I feel so silly, but I did not ParseFromString the data previously, and that was all it took. Extremely grateful to all who gave replies. Hope this helps someone out there.

  • 1
    I edited the question, and I gotta tell you something: never apologize for being a "noobie". Learning is good! – M.K Jun 14 '19 at 06:03
  • Thanks! really appreciate the encouragement! – cloud ostrich Jun 14 '19 at 06:17
  • While I absolutely agree with the point @M.K makes regarding apologizing for being a newbie, I think that the removal of the indentation of the code lines starting at `# encoding request` breaks the code. The name `client_stream` is created in the local scope of the `parent` coroutine, but is now accessed outside of it. Also, the `print` starting with `parent:` somewhat implies that this should still be part of the body of the coro. Can you please verify? – shmee Jun 14 '19 at 07:18
  • @shmee , you are correct. my code is badly formatted. I have edited the proper whitespace. Thanks for the warm and gentle advice. – cloud ostrich Jun 16 '19 at 05:58

1 Answers1

3

Like @shmee said in the comment, I think your code got mangled some by the edits... you should double-check.

When I did this script without trio, Dtc.EncodingResponse() would give the full response in readable format

I think you might have dropped a line when switching to Trio? Dtc.EncodingResponse() just creates a new empty EncodingResponse object. If you want to parse the data from m_body into your new object, you have to do that explicitly, with something like:

m_resp = Dtc.EncodingResponse()
m_resp.ParseFromString(m_body)

However, there's another problem... the reason it's called receive_some is that it receives some bytes, but might not receive all the bytes you asked for. Your code is assuming that a single call to receive_some will fetch all the bytes in the response, and that might be true when you're doing simple test, but in general it's not guaranteed. If you don't get enough data on the first call to receive_some, you might need to keep calling it repeatedly until you get all the data.

This is actually very standard... sockets work the same way. That's why the first thing your server is sending an m_size field at the beginning – it's so you can tell whether you've gotten all the data or not!

Unfortunately, as of June 2019, Trio doesn't provide a helper to do this loop for you – you can track progress on that in this issue. In the mean time, it's possible to write your own. I think something like this should work:

async def receive_exactly(stream, count):
    buf = bytearray()
    while len(buf) < count:
        new_data = await stream.receive_some(count - len(buf))
        if not new_data:
            raise RuntimeError("other side closed the connection unexpectedly")
        buf += new data
    return buf

async def receive_encoding_response(stream):
    header = await receive_exactly(stream, 4)
    (m_size, m_type) = struct.unpack('<HH', header)
    m_body = await receive_exactly(stream, m_size)
    m_resp = Dtc.EncodingResponse()
    m_resp.ParseFromString(m_size)
    return m_resp
Nathaniel J. Smith
  • 11,613
  • 4
  • 41
  • 49
  • Thank you so very much for your reply, parsing the data from m_body into the new object did indeed give me the data in readable format. I feel so silly, the solution is so simple. But I am extremely grateful all the same. – cloud ostrich Jun 17 '19 at 04:12