I have an asynchronous python TCP socket that does not close connections. Nor does a for loop run that should close all exceptional sockets supplied by select.select(). However the layout of my code seemingly allows for all of the above.
What I want to happen is:
inputs = []
create server (non-blocking)
inputs.append(server)
while true:
readable, _, exceptional = select.select(inputs, [], inputs)
for s in readable:
if server:
accept new client and append to inputs
if client:
accept length indicator data (non-blocking)
if no data:
close client and remove from inputs
if data:
get more data
if no data:
close client and remove from inputs
if data:
Place data in Queue.Queue
keep socket alive and in inputs
(my logic for this is that the client may send data
sooner rather than later and can do so across the same
connection - however this doesn't seem to happen, they
seem to always go into the exceptional list)
Here is some further context. I cannot program the clients (as of yet) I believe they are designed in c# (is there anything different about a c# socket communicating with a python server socket?) The clients are small IoT devices essentially, they send a very small amount of data (48 bytes). The goal is to have them constantly connected with the socket (not reconnecting) so as to increase data transfer rates.
Here is the actual code (partially reduced):
#!/usr/bin/python
"""
WebSocket with Hive Readers as clients
@Created: 21/06/2017
@Version: 2.0
@Last_Update:30/06/2017
@author: Daniel Cull
"""
import socket
import threading
import errno
import logging
import urllib2
import json
import datetime
import timeit
import Queue
import time
import select
from HTMLParser import HTMLParser
logging.basicConfig(filename='/home/dsp/logs/hiveBridge.log',
level=logging.DEBUG,
format='%(asctime)s %(levelname)s: %(threadName)s - %(message)s')
BUF_SIZE = 10
q = Queue.Queue(BUF_SIZE)
# Defines the hostname and port to listen on
host = socket.gethostname()
port = 9007
url = 'http://pearlai-text-dev.eu-west-1.elasticbeanstalk.com/api'
record_len = 47
class MLStripper(HTMLParser):
def __init__(self):
HTMLParser.__init__(self)
self.reset()
self.fed = []
def handle_data(self, d):
self.fed.append(d)
def get_data(self):
return ''.join(self.fed)
class ProducerThread(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
super(ProducerThread, self).__init__()
self.target = target
self.name = name
def run(self):
def receive_n_bytes(n, client):
data = ''
# while len(data) < n:
try:
logging.debug('Trying to receive %s from %s', n, s.getpeername())
chunk = client.recv(n)
data += chunk
except socket.error as se:
logging.error("Err Message: %s", se)
logging.error("ErrNo: %s", se.errno)
logging.error("Err Code: %s", errno.errorcode[se.errno])
data = ''
pass
return data
# Defines the socket family and socket to use.
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.setblocking(0)
# Binds the socket to the host and port and begins listening for up to 5 client connections
# Anymore than 5 connections will be rejected until there is room
try:
server.bind((host, port))
except socket.error as e:
logging.error("Err Message: %s", e)
logging.error("ErrNo: %s", e.errno)
logging.error("Err Code: %s", errno.errorcode[e.errno])
server.listen(5)
inputs = [server]
logging.info("-----------------------------")
logging.info("---- Started Hive Socket ----")
logging.info("-----------------------------")
logging.info("Listening for clients on %s:%s", host, port)
while True:
readable, _, exceptional = select.select(inputs, [], inputs)
if not q.full():
start = timeit.default_timer()
for s in readable:
if s is server:
try:
(client_sock, (client_ip, client_port)) = s.accept()
client_sock.setblocking(0)
inputs.append(client_sock)
logging.info("Connection from %s on port %s", client_ip, client_port)
except socket.error as e:
logging.error("Err Message: %s", e)
logging.error("ErrNo: %s", e.errno)
logging.error("Err Code: %s", errno.errorcode[e.errno])
pass
else:
# 1st Byte = Length Indicator. Receive 1st byte to know how many to receive next
len_indicator = receive_n_bytes(1, s)
if len_indicator:
# Allows indicator to be read
len_indicator_b = bytearray(len_indicator)
# Every record is 47 bytes long, 5 as a header, 2 as a footer, data in the middle.
msg_len = (len_indicator_b[0] * record_len)
logging.info("Received Header, Length = %s", msg_len)
divides_by = msg_len % record_len
# if it isn't 47 in length: stop.
if divides_by == 0:
# Receive the rest of the data
message_data = receive_n_bytes(msg_len, s)
if message_data:
logging.debug('MESSAGE DATA SIZE: %s', len(message_data))
logging.debug('INPUT QUEUE SIZE: %s', len(inputs))
logging.debug('READABLE QUEUE SIZE: %s', len(readable))
logging.debug('EXCEPTIONAL QUEUE SIZE: %s', len(inputs))
q.put(message_data)
logging.info('Putting a new message in the queue. Queue Size: %s', str(q.qsize()))
# inputs.remove(s)
# s.close()
stop = timeit.default_timer()
execution_time = stop - start
logging.info('Execution Time: %s', execution_time)
else:
# After two days of constant data this else statement has NEVER OCCURRED
inputs.remove(s)
logging.debug('Attempting to close socket: %s', s.getpeername())
s.shutdown(2)
s.close()
stop = timeit.default_timer()
execution_time = stop - start
logging.info('No Data Found. Execution Time: %s', execution_time)
else:
logging.debug('Wrong length of data received: %s % 47 = %s', msg_len, divides_by)
else:
inputs.remove(s)
logging.debug('Attempting to close socket: %s', s.getpeername())
s.shutdown(2)
s.close()
stop = timeit.default_timer()
execution_time = stop - start
logging.info('No Data Found. Execution Time: %s', execution_time)
# This for loop never runs!? :(
for s in exceptional:
logging.debug('Closing exceptional socket %s', s.getpeername())
inputs.remove(s)
s.close()
return
class ConsumerThread(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, verbose=None):
super(ConsumerThread, self).__init__()
self.target = target
self.name = name
return
def run(self):
def calc_values(byte_data):
# Does calculations here to output the values from the message.
if __name__ == '__main__':
p = ProducerThread(name='producer')
c = ConsumerThread(name='consumer')
p.start()
time.sleep(2)
c.start()
time.sleep(2)
I want to add in a dictionary to keep track of clients and hopefully output more information. However from my logs I can see that socket connections do get reused (sometimes) but in general all sockets stay in inputs. This list grows to hundreds of sockets when it should only be about 60.
The readable list always seems to contain only 1 socket at a time The exceptional list contains all other sockets from the inputs array.
Here are some log lines near the end of a test:
2017-10-26 14:50:28,880 INFO: consumer - Execution Time: 0.10178899765
2017-10-26 14:50:45,958 DEBUG: producer - Trying to receive 1 from ('82.68.232.180', 17627)
2017-10-26 14:50:45,958 INFO: producer - Received Header, Length = 47
2017-10-26 14:50:45,958 DEBUG: producer - Trying to receive 47 from ('82.68.232.180', 17627)
2017-10-26 14:50:45,958 DEBUG: producer - MESSAGE DATA SIZE: 47
2017-10-26 14:50:45,958 DEBUG: producer - INPUT QUEUE SIZE: 252
2017-10-26 14:50:45,959 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-26 14:50:45,959 DEBUG: producer - EXCEPTIONAL QUEUE SIZE: 252
2017-10-26 14:50:45,959 INFO: producer - Putting a new message in the queue. Queue Size: 1
2017-10-26 14:50:45,959 INFO: producer - Execution Time: 0.000602960586548
2017-10-26 14:50:45,959 INFO: producer - Connection from 82.68.232.180 on port 22108
2017-10-26 14:50:46,084 DEBUG: producer - Trying to receive 1 from ('82.68.232.180', 22108)
2017-10-26 14:50:46,084 INFO: producer - Received Header, Length = 47
2017-10-26 14:50:46,084 DEBUG: producer - Trying to receive 47 from ('82.68.232.180', 22108)
2017-10-26 14:50:46,084 DEBUG: producer - MESSAGE DATA SIZE: 47
2017-10-26 14:50:46,084 DEBUG: producer - INPUT QUEUE SIZE: 253
2017-10-26 14:50:46,084 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-26 14:50:46,084 DEBUG: producer - EXCEPTIONAL QUEUE SIZE: 253
2017-10-26 14:50:46,084 INFO: producer - Putting a new message in the queue. Queue Size: 1
2017-10-26 14:50:46,084 INFO: producer - Execution Time: 0.000601053237915
2017-10-26 14:50:46,104 INFO: consumer - Message Posted. Response Code: 200
2017-10-26 14:50:46,104 INFO: consumer - {'x': 1, 'readerId': '43', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '1', 'datetime': '2017/10/26 14:50:45.982926', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540029216', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-26 14:50:46,104 INFO: consumer - Execution Time: 0.121989965439
2017-10-26 14:50:46,493 INFO: consumer - Message Posted. Response Code: 200
2017-10-26 14:50:46,494 INFO: consumer - {'x': 1, 'readerId': '63', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '67', 'datetime': '2017/10/26 14:50:46.412514', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540423968', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-26 14:50:46,494 INFO: consumer - Execution Time: 0.0815811157227
2017-10-26 14:50:49,606 DEBUG: producer - Trying to receive 1 from ('82.68.232.180', 13316)
2017-10-26 14:50:49,606 INFO: producer - Received Header, Length = 47
2017-10-26 14:50:49,607 DEBUG: producer - Trying to receive 47 from ('82.68.232.180', 13316)
2017-10-26 14:50:49,607 DEBUG: producer - MESSAGE DATA SIZE: 47
2017-10-26 14:50:49,607 DEBUG: producer - INPUT QUEUE SIZE: 253
2017-10-26 14:50:49,607 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-26 14:50:49,607 DEBUG: producer - EXCEPTIONAL QUEUE SIZE: 253
2017-10-26 14:50:49,607 INFO: producer - Putting a new message in the queue. Queue Size: 1
2017-10-26 14:50:49,607 INFO: producer - Execution Time: 0.000608205795288
2017-10-26 14:50:49,728 INFO: consumer - Message Posted. Response Code: 200
2017-10-26 14:50:49,728 INFO: consumer - {'x': 1, 'readerId': '22', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '0', 'datetime': '2017/10/26 14:50:49.627017', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540028960', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-26 14:50:49,728 INFO: consumer - Execution Time: 0.101749897003
2017-10-26 14:50:51,694 DEBUG: producer - Trying to receive 1 from ('82.68.232.180', 21170)
2017-10-26 14:50:51,694 INFO: producer - Received Header, Length = 47
2017-10-26 14:50:51,694 DEBUG: producer - Trying to receive 47 from ('82.68.232.180', 21170)
2017-10-26 14:50:51,694 DEBUG: producer - MESSAGE DATA SIZE: 47
2017-10-26 14:50:51,694 DEBUG: producer - INPUT QUEUE SIZE: 253
2017-10-26 14:50:51,694 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-26 14:50:51,694 DEBUG: producer - EXCEPTIONAL QUEUE SIZE: 253
2017-10-26 14:50:51,694 INFO: producer - Putting a new message in the queue. Queue Size: 1
2017-10-26 14:50:51,695 INFO: producer - Execution Time: 0.00062108039856
2017-10-26 14:50:51,856 INFO: consumer - Message Posted. Response Code: 200
2017-10-26 14:50:51,857 INFO: consumer - {'x': 1, 'readerId': '18', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '26', 'datetime': '2017/10/26 14:50:51.714788', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540161568', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-26 14:50:51,857 INFO: consumer - Execution Time: 0.142374992371
2017-10-26 14:50:55,586 DEBUG: producer - Trying to receive 1 from ('82.68.232.180', 20627)
2017-10-26 14:50:55,586 INFO: producer - Received Header, Length = 47
2017-10-26 14:50:55,586 DEBUG: producer - Trying to receive 47 from ('82.68.232.180', 20627)
2017-10-26 14:50:55,586 DEBUG: producer - MESSAGE DATA SIZE: 47
2017-10-26 14:50:55,586 DEBUG: producer - INPUT QUEUE SIZE: 253
2017-10-26 14:50:55,586 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-26 14:50:55,586 DEBUG: producer - EXCEPTIONAL QUEUE SIZE: 253
2017-10-26 14:50:55,586 INFO: producer - Putting a new message in the queue. Queue Size: 1
2017-10-26 14:50:55,586 INFO: producer - Execution Time: 0.000593900680542
2017-10-26 14:50:55,707 INFO: consumer - Message Posted. Response Code: 200
2017-10-26 14:50:55,708 INFO: consumer - {'x': 1, 'readerId': '59', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '69', 'datetime': '2017/10/26 14:50:55.606343', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540424480', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-26 14:50:55,708 INFO: consumer - Execution Time: 0.101861000061
2017-10-26 14:50:58,475 DEBUG: producer - Trying to receive 1 from ('82.68.232.180', 20627)
2017-10-26 14:50:58,476 INFO: producer - Received Header, Length = 47
2017-10-26 14:50:58,476 DEBUG: producer - Trying to receive 47 from ('82.68.232.180', 20627)
2017-10-26 14:50:58,476 DEBUG: producer - MESSAGE DATA SIZE: 47
2017-10-26 14:50:58,476 DEBUG: producer - INPUT QUEUE SIZE: 253
2017-10-26 14:50:58,476 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-26 14:50:58,476 DEBUG: producer - EXCEPTIONAL QUEUE SIZE: 253
2017-10-26 14:50:58,476 INFO: producer - Putting a new message in the queue. Queue Size: 1
2017-10-26 14:50:58,476 INFO: producer - Execution Time: 0.000596046447754
2017-10-26 14:50:58,597 INFO: consumer - Message Posted. Response Code: 200
2017-10-26 14:50:58,598 INFO: consumer - {'x': 1, 'readerId': '59', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '69', 'datetime': '2017/10/26 14:50:58.496193', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540424480', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-26 14:50:58,598 INFO: consumer - Execution Time: 0.101848840714
I have asked a lot of questions, but the main thing that's really bugging me is THAT SECOND FOR LOOP... IT NEVER RUNS! It's baffling.
EDIT: I made a select.poll() version that also does not recycle sockets... whats going on :(
EDIT 2: I made some changes to the code and added a method of timing out the sockets based on their last_read time. However there seems to be 22 sockets in the inputs list that do not have a last_read time, I don't understand how that can even happen. Its the same code as above, tidied. With these extra loops:
for s in last_read:
to_close = [s for s in last_read if s not in readable and now - last_read[s] > 315]
for s in to_close:
logging.debug('Socket Timeout: %s', clients[s])
try:
inputs.remove(s)
except ValueError as e:
logging.debug(e)
pass
s.close()
del last_read[s]
del clients[s]
Here are some logs:
2017-10-27 14:54:35,472 INFO: consumer - {'x': 1, 'readerId': '20', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '3', 'datetime': '2017/10/27 14:54:35.429098', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540029728', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-27 14:54:35,473 INFO: consumer - Execution Time: 0.043848991394
2017-10-27 14:54:42,814 DEBUG: producer - INPUT QUEUE SIZE: 76
2017-10-27 14:54:42,814 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-27 14:54:42,814 DEBUG: producer - LAST_READ QUEUE SIZE: 54
2017-10-27 14:54:42,814 INFO: producer - New message put in the queue. Queue Size: 1 | Message Size: 47
2017-10-27 14:54:42,814 INFO: producer - Execution Time: 0.000533819198608
2017-10-27 14:54:42,895 INFO: consumer - Message Posted. Response Code: 200
2017-10-27 14:54:42,895 INFO: consumer - {'x': 1, 'readerId': '12', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '62', 'datetime': '2017/10/27 14:54:42.815721', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540422688', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-27 14:54:42,895 INFO: consumer - Execution Time: 0.0798990726471
2017-10-27 14:54:50,406 INFO: producer - Connection from ('82.68.232.180', 8609)
2017-10-27 14:54:50,609 DEBUG: producer - INPUT QUEUE SIZE: 77
2017-10-27 14:54:50,609 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-27 14:54:50,609 DEBUG: producer - LAST_READ QUEUE SIZE: 55
2017-10-27 14:54:50,609 INFO: producer - New message put in the queue. Queue Size: 1 | Message Size: 47
2017-10-27 14:54:50,609 INFO: producer - Execution Time: 0.000415086746216
2017-10-27 14:54:50,736 INFO: consumer - Message Posted. Response Code: 200
2017-10-27 14:54:50,736 INFO: consumer - {'x': 1, 'readerId': '56', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '61', 'datetime': '2017/10/27 14:54:50.610891', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540422432', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-27 14:54:50,736 INFO: consumer - Execution Time: 0.125483036041
2017-10-27 14:54:51,761 DEBUG: producer - INPUT QUEUE SIZE: 77
2017-10-27 14:54:51,761 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-27 14:54:51,761 DEBUG: producer - LAST_READ QUEUE SIZE: 55
2017-10-27 14:54:51,761 INFO: producer - New message put in the queue. Queue Size: 1 | Message Size: 47
2017-10-27 14:54:51,761 INFO: producer - Execution Time: 0.000416040420532
2017-10-27 14:54:51,899 INFO: consumer - Message Posted. Response Code: 200
2017-10-27 14:54:51,899 INFO: consumer - {'x': 1, 'readerId': '533', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '68', 'datetime': '2017/10/27 14:54:51.762411', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540424224', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-27 14:54:51,899 INFO: consumer - Execution Time: 0.137360095978
2017-10-27 14:54:52,235 INFO: producer - Connection from ('82.68.232.180', 34801)
2017-10-27 14:54:52,462 DEBUG: producer - INPUT QUEUE SIZE: 78
2017-10-27 14:54:52,462 DEBUG: producer - READABLE QUEUE SIZE: 1
2017-10-27 14:54:52,462 DEBUG: producer - LAST_READ QUEUE SIZE: 56
2017-10-27 14:54:52,462 INFO: producer - New message put in the queue. Queue Size: 1 | Message Size: 47
2017-10-27 14:54:52,462 INFO: producer - Execution Time: 0.0003981590271
2017-10-27 14:54:52,536 INFO: consumer - Message Posted. Response Code: 200
2017-10-27 14:54:52,536 INFO: consumer - {'x': 1, 'readerId': '70', 'locationSensorId': '0', 'lux': '0', 'locationCounter': '0', 'alarm': '0', 'rssi': '58', 'datetime': '2017/10/27 14:54:52.463618', 'magnet': '3', 'y': 1, 'uniqueName': 'hive', 'v': 1, 'agesent': '540358688', 'sensorId': '7777777', 'z': 1, 'movement': '65535'}
2017-10-27 14:54:52,536 INFO: consumer - Execution Time: 0.0730180740356
EDIT 3: The inputs list is still slowly growing and is now twice the size of the last_read list. So sockets must be connecting, then never going into the readable queue?
If I had a: for s in inputs: to_be_closed = [s for s in inputs if s not in readable and s not in last_read]
Does this make sense?