I'm using Asyncio in Python 3.4, I'll try to explain what I'm doing to this point and what I (think) is causing the issue.
On one end I have a UDP connection framework with blocking operations, I'm taking the data I get from this stream and creating json that I pass out to the client in SSE format. This is all working great.
The issue I'm running into is that I can't get it to handle client disconnects properly if I don't do anything and a client disconnects I'll start getting this error:
WARNING [selector_events:613] socket.send() raised exception.
since the loop is still running, I've been looking into ways to cleanly break the loop and trigger the .close() but I'm running into issues with the examples I've found and there's not many resources online.
The one example that seems to actually work is trying to read a line from the client and if it's an empty string that means the client disconnected.
while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break
however after about ten messages all messages to the client stop, I think this is because it's hanging on "data = (yield from client_reader.readline())" after it hangs if I close the client then it does properly shut down and "End Connection" does get called. Any ideas why it might be hanging? I think I have a pretty good handle on Asyncio at this point but this one is puzzling me.
Note: location() and status() are my two calls to get info from the UDP socket - I've successfully run them without issues for many hours with this same code - minus the client disconnect lines.
clients = {}
def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_writer)
def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")
log.info("New Connection")
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
yield from postmessage(data,client_writer)
while True:
data = (yield from client_reader.readline())
if not data: #client disconnected
break
data = yield from asyncio.wait_for(location(),
timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(status(),
timeout=1.0)
yield from postmessage(data,client_writer)
@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()
Update: if I add a timeout on the "yield from client_reader" I get the following error when it gets to the point that it would normally hang.
2014-11-17 03:13:56,214 INFO [try:23] End Connection
2014-11-17 03:13:56,214 ERROR [base_events:912] Task exception was never retrieved
future: <Task finished coro=<handle_client() done, defined at try.py:29> exception=TimeoutError()>
Traceback (most recent call last):
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 236, in _step
result = next(coro)
File "try.py", line 35, in handle_client
timeout=1.0))
File "/opt/python3.4.2/lib/python3.4/asyncio/tasks.py", line 375, in wait_for
raise futures.TimeoutError()
concurrent.futures._base.TimeoutError
Here's a sample script showing the bug in action - just run it in python 3.4.2 and after 9 iterations it'll hang on reading from the client.
(The script complete so you can run it to see for yourself)
import asyncio
import logging
import json
import time
log = logging.getLogger(__name__)
clients = {}
def accept_client(client_reader, client_writer):
task = asyncio.Task(handle_client(client_reader, client_writer))
clients[task] = (client_writer)
def client_done(task):
del clients[task]
client_writer.close()
log.info("End Connection")
log.info("New Connection")
task.add_done_callback(client_done)
@asyncio.coroutine
def handle_client(client_reader, client_writer):
data = {'result':{'status':'Connection Ready'}}
postmessage(data,client_writer)
count = 0
while True:
data = (yield from asyncio.wait_for(client_reader.readline(),timeout=1.0))
if not data: #client disconnected
break
data = yield from asyncio.wait_for(test1(),timeout=1.0)
yield from postmessage(data,client_writer)
data = yield from asyncio.wait_for(test2(),timeout=1.0)
yield from postmessage(data,client_writer)
@asyncio.coroutine
def postmessage(data, client_writer):
mimetype=('text/event-stream')
response = ('data: {0}\n\n'.format(data).encode('utf-8'))
client_writer.write(response)
client_writer.drain()
@asyncio.coroutine
def test1():
data = {'result':{
'test1':{ }
}
}
data = json.dumps(data)
return data
@asyncio.coroutine
def test2():
data = {'result':{
'test2':{ }
}
}
data = json.dumps(data)
return data
def main():
loop = asyncio.get_event_loop()
f = asyncio.start_server(accept_client, host=None, port=2991)
loop.run_until_complete(f)
loop.run_forever()
if __name__ == '__main__':
log = logging.getLogger("")
formatter = logging.Formatter("%(asctime)s %(levelname)s " +
"[%(module)s:%(lineno)d] %(message)s")
# log the things
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
log.addHandler(ch)
main()
Another update: I've found it dies because it reads all the lines from the header of the client and then times out when it runs out of lines. I think, the real answer I'm looking for is how to detect client disconnects when you don't actually need to receive data from the client (other then the initial connection).