0

I have a distributed computation framework which uses Celery + RABBITMQ + supervisor. The tasks of my worker involve reading from a database, computing some values and updating the database after the process is done. However, when I try and run multiple workers in a distributed fashion, I keep hitting the error :-

(2014, "Commands out of sync; you can't run this command now")

Can anyone suggest me a way to setup a mutex or lockfile-like mechanism, so that the workers can access the database concurrently.

Any help will be appreciated, thanks, Amit

Edit :-

con = mdb.connect(parameters...)

def reset_table(table_name,con):
    with con:
        cur = con.cursor(mdb.cursors.DictCursor)
        cur.execute("UPDATE " + table_name + " SET active_status = 0 where last_access <     (NOW() - INTERVAL 15 MINUTE)")
        con.commit()

StackTrace :-

   File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 238, in trace_task
    R = retval = fun(*args, **kwargs)
  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 416, in    __protected_call__
    return self.run(*args, **kwargs)
  File "/home/elasticsearch/celery_test/tasks.py", line 183, in download_data
    auth = get_auth(con)
  File "/home/elasticsearch/celery_test/tasks.py", line 94, in get_auth
    reset_table("auths",con)
  File "/usr/lib/python2.7/dist-packages/MySQLdb/connections.py", line 249, in __exit__
    self.rollback()
ProgrammingError: (2014, "Commands out of sync; you can't run this command now")
Amit Gupta
  • 451
  • 5
  • 18

1 Answers1

1

I wouldn't hurry to apply a locking mechanism but rather try to see how to fix the way your client is using the database, see mysql docs for commands out of sync

If you get Commands out of sync; you can't run this command now in your client code, you are calling client functions in the wrong order.

in case you decide to implement locking (which again, I wouldn't recommend), one good way to do it is as follows:

import redis

have_lock = False
my_lock = redis.Redis().lock("my_key")
try:
    have_lock = my_lock.acquire(blocking=True)
    if have_lock:
        print("Got lock.")
    else:
        print("Did not acquire lock.")

finally:
    if have_lock:
        my_lock.release()

for detailed explanation see http://loose-bits.com/2010/10/distributed-task-locking-in-celery.html

Guy Gavriely
  • 11,228
  • 6
  • 27
  • 42
  • Does MySQL-python implement wait functions by default. I am sorry, but I don't understand how I can make it work without a wait function somewhere in the code. – Amit Gupta Jan 08 '14 at 02:53
  • hard to tell without seeing your queries / SQL etc. also, its better to include entire stack trace rather than just the error itself – Guy Gavriely Jan 08 '14 at 02:59
  • I see, so, do you know what that `with` statement actually does? see http://stackoverflow.com/questions/8067690/context-manager-for-pythons-mysqldb – Guy Gavriely Jan 08 '14 at 03:21
  • I am sorry I still don't understand the answer. It seems like I actually don't need to implement a lockfile mechanism. I just need to make sure the calls to db are being made correctly, and mysql_free_result is being called. However, I am not sure how to ensure that. – Amit Gupta Jan 08 '14 at 18:49
  • My guess is that `with` and `con.commit ()` are not going well together, delete `con.commit ()` and see how it goes – Guy Gavriely Jan 08 '14 at 19:17