0

I had using pika to make a connection to RabbitMQ and consume message, once I start the script on ubuntu prod environment it is working as expected but is opening mysql connection and never closes them and ends up in Too many connection on mysql server.

Will appreciate any recommendation on the code below, as well can not understand what is going wrong. Thanking you in advance.

The flow is the following

  1. Starting pika on Python3
  2. Subscribe to a channel and waiting for messages
  3. In callback i do various validation and save or update data inside MySql
  4. The result that is showing the problem is the at the end of question a screenshot from ubuntu htop, that is showing new connection on MySql and keep adding them on the top

Pika Verion = 0.13.0

For MySql I use pymysql.

Pika Script

def main():
    credentials = pika.PlainCredentials(tunnel['queue']['name'], tunnel['queue']['password'])

    while True:
        try:
            cp = pika.ConnectionParameters(
                host=tunnel['queue']['host'],
                port=tunnel['queue']['port'],
                credentials=credentials,
                ssl=tunnel['queue']['ssl'],
                heartbeat=600,
                blocked_connection_timeout=300
            )

            connection = pika.BlockingConnection(cp)
            channel = connection.channel()

            def callback(ch, method, properties, body):
                if 'messageType' in properties.headers:
                    message_type = properties.headers['messageType']

                    if events.get(message_type):
                        result = Descriptors._reflection.ParseMessage(events[message_type]['decode'], body)
                        if result:
                            result = protobuf_to_dict(result)
                            model.write_response(external_response=result, message_type=message_type)
                    else:
                        app_log.warning('Message type not in allowed list = ' + str(message_type))
                        app_log.warning('continue listening...')

            channel.basic_consume(callback, queue=tunnel['queue']['name'], no_ack=True)
            try:
                channel.start_consuming()
            except KeyboardInterrupt:
                channel.stop_consuming()
                connection.close()
                break
        except pika.connection.exceptions.ConnectionClosed as e:
            app_log.error('ConnectionClosed :: %s' % str(e))
            continue
        except pika.connection.exceptions.AMQPChannelError as e:
            app_log.error('AMQPChannelError :: %s' % str(e))
            continue
        except Exception as e:
            app_log.error('Connection was closed, retrying... %s' % str(e))
            continue


if __name__ == '__main__':
    main()

Inside the script i have a model that doing inserts or updated in the database, code below

def write_response(self, external_response, message_type):
    table_name = events[message_type]['table_name']
    original_response = external_response[events[message_type]['response']]
    if isinstance(original_response, list):
        external_response = []
        for o in original_response:
            record = self.map_keys(o, message_type, events[message_type].get('values_fix', {}))
            external_response.append(self.validate_fields(record))
    else:
        external_response = self.map_keys(original_response, message_type, events[message_type].get('values_fix', {}))
        external_response = self.validate_fields(external_response)

    if not self.mysql.open:
        self.mysql.ping(reconnect=True)

    with self.mysql.cursor() as cursor:
        if isinstance(original_response, list):
            for e in external_response:
                id_name = events[message_type]['id_name']
                filters = {id_name: e[id_name]}
                self.event(
                    cursor=cursor,
                    table_name=table_name,
                    filters=filters,
                    external_response=e,
                    message_type=message_type,
                    event_id=e[id_name],
                    original_response=e  # not required here
                )
        else:
            id_name = events[message_type]['id_name']
            filters = {id_name: external_response[id_name]}
            self.event(
                cursor=cursor,
                table_name=table_name,
                filters=filters,
                external_response=external_response,
                message_type=message_type,
                event_id=external_response[id_name],
                original_response=original_response
            )
    cursor.close()
    self.mysql.close()

    return

On ubuntu i use systemd to run the script and restart in case something goes wrong, below is systemd file

[Unit]
Description=Pika Script
Requires=stunnel4.service
Requires=mysql.service
Requires=mongod.service

[Service]
User=user
Group=group
WorkingDirectory=/home/pika_script
ExecStart=/home/user/venv/bin/python pika_script.py
Restart=always

[Install]
WantedBy=multi-user.target

Image from ubuntu htop, how the MySql keeps adding in the list and never close it enter image description here

Error

tornado_mysql.err.OperationalError: (1040, 'Too many connections')
marc_s
  • 732,580
  • 175
  • 1,330
  • 1,459
aaa
  • 446
  • 2
  • 8
  • 29

3 Answers3

1

i have found the issue, posting if will help somebody else.

the problem was that mysqld went into infinite loop trying to create indexing to a specific database, after found to which database was trying to create the indexes and never succeed and was trying again and again.

solution was to remove the database and recreate it, and the mysqld process went back to normal. and the infinite loop to create indexes dissapeared as well.

aaa
  • 446
  • 2
  • 8
  • 29
1

I would say increasing connection may solve your problem temperately.

1st find out why the application is not closing the connection after completion of task.

2nd Any slow queries/calls on the DB and fix them if any.

3rd considering no slow queries/calls on DB and also application is closing the connection/thread after immediately completing the task, then consider playing with "wait_timeout" on mysql side.

Vaibhav
  • 41
  • 3
0

According to this answer, if you have MySQL 5.7 and 5.8 :

It is worth knowing that if you run out of usable disc space on your server partition or drive, that this will also cause MySQL to return this error. If you're sure it's not the actual number of users connected then the next step is to check that you have free space on your MySQL server drive/partition.

From the same thread. You can inspect and increase number of MySQL connections.

rok
  • 9,403
  • 17
  • 70
  • 126
  • to increase the number of connections is not a solution, as on monitoring i see instantly every 10 second new connection coming out – aaa Jul 06 '20 at 15:01
  • are you talking about `htop` output? it's not `mysl` client new connections. it's `mysqld` server multiple processes. you have issues and restart `mysqld` each time? – rok Jul 07 '20 at 07:21
  • yes i restart it everytime because too many processes and everything crashes after a while – aaa Jul 07 '20 at 07:30