0

I'm trying to process an entire csv file as fast as possible, so I'm looking to process each line in parallel as a celery task. The cleanup, which is also a celery task, has to wait until every line is processed. See the example below.

The problem is, I can't seem to get through a file because I keep running into Connection errors with MySQL. So far, I've seen these two errors: 2013, 'Lost connection to MySQL server during query' and 2006, 'MySQL server has gone away'

from app.db.meta import Session
from celery import chord, Celery
from celery.signals import task_postrun

celery = Celery()
celery.config_from_object('config')

@task_postrun.connect
def close_session(*args, **kwargs):
    Session.remove()

def main():
    # process each line in parallel
    header = [process_line.s(line) for line in csv_file]
    # pass stats to cleanup after all lines are processed
    callback = cleanup.s()
    chord(header)(callback)

@celery.task
def process_line(line):
    session = Session()
    ...
    # process line
    ...
    return stats

@celery.task
def cleanup(stats):
    session = Session()
    ...
    # do cleanup and log stats
    ...

I'm using celery 3.1.18 and SQLAlchemy 0.9.9. I'm using connection pooling as well.

mysql> SHOW FULL PROCESSLIST;                                                                  
+----+------+-----------+-----------------+---------+------+-------+-----------------------+ 
| Id | User | Host      | db              | Command | Time | State          | Info             | 
+----+------+-----------+-----------------+---------+------+-------+-----------------------+                           
|  1 | root | localhost | ab__development | Sleep   | 4987 |       | NULL                  |                           
| 11 | root | localhost | ab__development | Sleep   | 1936 |       | NULL                  |                           
| 16 | root | localhost | ab__development | Sleep   |  143 |       | NULL                  |                           
| 17 | root | localhost | ab__development | Sleep   | 1045 |       | NULL                  |                           
| 18 | root | localhost | NULL            | Query   |    0 | init  | SHOW FULL PROCESSLIST |                                            
| 21 | root | localhost | ab__development | Sleep   |    7 |       | NULL                  |                           
+----+------+-----------+-----------------+---------+------+-------+-----------------------+                           
6 rows in set (0.01 sec)                                                                       
BDuelz
  • 3,890
  • 7
  • 39
  • 62
  • No value set for `max_connection`, so I assume default of 100. – BDuelz Jun 03 '15 at 21:03
  • `max_connections=151` – BDuelz Jun 03 '15 at 21:22
  • I can't copy the entire output of `show processlist`. But, I see 6 rows - all the same user and host. 5 of which have the same db (app database), while the other says NULL. 5 of which says Sleep as the command, while the other says query. 5 of which have large Time values, while the other has 0. – BDuelz Jun 03 '15 at 21:26
  • I see 6 rows, MAX(id) == 21 if that answers the question – BDuelz Jun 03 '15 at 21:31
  • That is the output while the celery tasks are running. – BDuelz Jun 03 '15 at 21:35
  • New row - this just happened `| 27 | root | localhost | ab__development | Query | 0 | Writing to net | rollback |` – BDuelz Jun 03 '15 at 21:40
  • The output of `show processlist` is not useful - the queries have already failed by the time you type it. The one showing `rollback` may well be significant, but not that informative. – scytale Jun 04 '15 at 11:10
  • Can you update your code to show the database operations please. And can you also do `SHOW TABLE ...`for the table(s) being operated on? – scytale Jun 04 '15 at 11:13
  • Check http://stackoverflow.com/a/21132756/3571 , it might be that you need to specify pool_recycle in the call to create_engine – codeape Jun 04 '15 at 16:03
  • I found something weird out today - whenever I first start my celery worker and kick off the chord task, I get connection issues before the tasks get a chance to finish. However, every time after the first time, it runs all the way through. – BDuelz Jun 05 '15 at 02:59
  • @codeape I'm using a connection pool of 100 already. – BDuelz Jun 05 '15 at 03:00
  • Yes but you might still have a problem where connections in the pool time out. And do you properly release sessions after they're used? I don't know what ``Session`` is, but if it is a ``scoped_session``, make sure you call ``session.remove()`` at the end of each task (commonly done in a finally block). – codeape Jun 05 '15 at 07:32
  • @codeape `Session` is a `scoped_session`. I hear what you're saying, but I have a signal that runs after each task that does just that. – BDuelz Jun 05 '15 at 14:14
  • When I run celery with one process, everything works just fine. `worker -c 1` – BDuelz Jun 05 '15 at 17:36
  • also, doing this before each task runs seems to work when using more than one process `Session.bind.engine.dispose(); Session.bind.engine.connect()` – BDuelz Jun 05 '15 at 17:55

1 Answers1

1

Read the answer. In short you have to either disable the SQLAlchemy's Pool engine or try to ping the mysql server:

from flask.ext.sqlalchemy import SQLAlchemy
from sqlalchemy import event, exc


def instance(app):
    """:rtype: SQLAlchemy"""
    db = SQLAlchemy(app)

    if app.testing:
        return db

    @event.listens_for(db.engine, 'checkout')
    def checkout(dbapi_con, con_record, con_proxy):
        try:
            try:
                dbapi_con.ping(False)
            except TypeError:
                app.logger.debug('MySQL connection died. Restoring...')
                dbapi_con.ping()
        except dbapi_con.OperationalError as e:
            app.logger.warning(e)
            if e.args[0] in (2006, 2013, 2014, 2045, 2055):
                raise exc.DisconnectionError()
            else:
                raise

    return db
Community
  • 1
  • 1