I'm trying to pause a celery task temporary based on user button click.
What I've done is:
When a user clicks a button; I release an AJAX request that updates my celery task state to "PAUSE"
Then; my tactic was to; when I initate a task into celery; it runs a for loop. Every for loop; I read my database 'state' and see if it's set to PAUSE: if it is set to pause; I want to sleep it for 60 seconds or sleep it until user hits resume button; same idea.
This is my code:
r = redis.StrictRedis(host='localhost', port=6379, db=0)
@celery.task(bind=True)
def runTask(self, arr)
for items in arr:
current_task_id = self.request.id
item = r.get('celery-task-meta-'+current_task_id)
load_as_json = json.loads(item)
if "PAUSE" in load_as_json['status']:
sleep(50)
@app.route('/start')
def start_task()
runTask.apply_async(args=[arr])
return 'task started running
Here is how my pause API endpoint looks like:
@app.route('/stop/<task_id>')
def updateTaskState():
task_id = request.cookie.get('task_id')
loadAsJson = json.loads(r.get('celery-task-meta-'+str(task_id)))
loadAsJson['status'] = 'PAUSE'
loadAsJson.update(loadAsJson)
dump_as_json = json.dumps(loadAsJson)
updated_state = r.set('celery-task-meta-'+last_key, dump_as_json)
return 'updated state';
From what I conceptually understand; is that the reason why I'm not seeing an updated state is because; the task is already executed and isnt able to retrieve updated values from database. FYI: The task update state is set to PAUSE immediately; I checked this by creating a seperate script that checks state within while loop; everytime I click the button that release AJAX request to update the state; my db gets updated and it reads "PAUSE" on the seperate script; however within the @celery.task decorator I can't seem to get the updated state.
Below is my seperate script I used to test; and it seems to be updatign state as expected; I just can't get the updated state within task decorator... weirdly.
r = redis.StrictRedis(host='localhost', port=6379, db=0)
last_key = r.keys()
while True:
response = r.get('celery-task-meta-b1534a87-e18b-4f0a-89e2-08348d833056')
loadAsJson = json.loads(response)
print loadAsJson['status']