I had using pika to make a connection to RabbitMQ and consume message, once I start the script on ubuntu prod environment it is working as expected but is opening mysql connection and never closes them and ends up in Too many connection on mysql server.
Will appreciate any recommendation on the code below, as well can not understand what is going wrong. Thanking you in advance.
The flow is the following
- Starting pika on Python3
- Subscribe to a channel and waiting for messages
- In callback i do various validation and save or update data inside MySql
- The result that is showing the problem is the at the end of question a screenshot from ubuntu htop, that is showing new connection on MySql and keep adding them on the top
Pika Verion = 0.13.0
For MySql I use pymysql.
Pika Script
def main():
credentials = pika.PlainCredentials(tunnel['queue']['name'], tunnel['queue']['password'])
while True:
try:
cp = pika.ConnectionParameters(
host=tunnel['queue']['host'],
port=tunnel['queue']['port'],
credentials=credentials,
ssl=tunnel['queue']['ssl'],
heartbeat=600,
blocked_connection_timeout=300
)
connection = pika.BlockingConnection(cp)
channel = connection.channel()
def callback(ch, method, properties, body):
if 'messageType' in properties.headers:
message_type = properties.headers['messageType']
if events.get(message_type):
result = Descriptors._reflection.ParseMessage(events[message_type]['decode'], body)
if result:
result = protobuf_to_dict(result)
model.write_response(external_response=result, message_type=message_type)
else:
app_log.warning('Message type not in allowed list = ' + str(message_type))
app_log.warning('continue listening...')
channel.basic_consume(callback, queue=tunnel['queue']['name'], no_ack=True)
try:
channel.start_consuming()
except KeyboardInterrupt:
channel.stop_consuming()
connection.close()
break
except pika.connection.exceptions.ConnectionClosed as e:
app_log.error('ConnectionClosed :: %s' % str(e))
continue
except pika.connection.exceptions.AMQPChannelError as e:
app_log.error('AMQPChannelError :: %s' % str(e))
continue
except Exception as e:
app_log.error('Connection was closed, retrying... %s' % str(e))
continue
if __name__ == '__main__':
main()
Inside the script i have a model that doing inserts or updated in the database, code below
def write_response(self, external_response, message_type):
table_name = events[message_type]['table_name']
original_response = external_response[events[message_type]['response']]
if isinstance(original_response, list):
external_response = []
for o in original_response:
record = self.map_keys(o, message_type, events[message_type].get('values_fix', {}))
external_response.append(self.validate_fields(record))
else:
external_response = self.map_keys(original_response, message_type, events[message_type].get('values_fix', {}))
external_response = self.validate_fields(external_response)
if not self.mysql.open:
self.mysql.ping(reconnect=True)
with self.mysql.cursor() as cursor:
if isinstance(original_response, list):
for e in external_response:
id_name = events[message_type]['id_name']
filters = {id_name: e[id_name]}
self.event(
cursor=cursor,
table_name=table_name,
filters=filters,
external_response=e,
message_type=message_type,
event_id=e[id_name],
original_response=e # not required here
)
else:
id_name = events[message_type]['id_name']
filters = {id_name: external_response[id_name]}
self.event(
cursor=cursor,
table_name=table_name,
filters=filters,
external_response=external_response,
message_type=message_type,
event_id=external_response[id_name],
original_response=original_response
)
cursor.close()
self.mysql.close()
return
On ubuntu i use systemd to run the script and restart in case something goes wrong, below is systemd file
[Unit]
Description=Pika Script
Requires=stunnel4.service
Requires=mysql.service
Requires=mongod.service
[Service]
User=user
Group=group
WorkingDirectory=/home/pika_script
ExecStart=/home/user/venv/bin/python pika_script.py
Restart=always
[Install]
WantedBy=multi-user.target
Image from ubuntu htop, how the MySql keeps adding in the list and never close it
Error
tornado_mysql.err.OperationalError: (1040, 'Too many connections')