18

I'm using testbed to unit test my google app engine app, and my app uses a taskqueue.

When I submit a task to a taskqueue during a unit test, it appears that the task is in the queue, but the task does not execute.

How do I get the task to execute during a unit test?

new name
  • 15,861
  • 19
  • 68
  • 114
  • 2
    Why do you want the task to execute? Wouldn't it make more sense to assert that the task has been added, then unit-test the execution of the task independently? – Nick Johnson Jul 10 '11 at 07:31
  • @Nick, that might be a better way of doing it. Doing it this way, I don't have to recreate the state when the task was queued. Also, executing the enqueued task and testing the results also helps test that the task was enqueued correctly. – new name Jul 11 '11 at 15:12

4 Answers4

24

Using Saxon's excellent answer, I was able to do the same thing using testbed instead of gaetestbed. Here is what I did.

Added this to my setUp():

    self.taskqueue_stub = apiproxy_stub_map.apiproxy.GetStub('taskqueue')

Then, in my test, I used the following:

    # Execute the task in the taskqueue
    tasks = self.taskqueue_stub.GetTasks("default")
    self.assertEqual(len(tasks), 1)
    task = tasks[0]
    params = base64.b64decode(task["body"])
    response = self.app.post(task["url"], params)

Somewhere along the line, the POST parameters get base64 encoded so had to undo that to get it to work.

I like this better than Saxon's answer since I can use the official testbed package and I can do it all within my own test code.

EDIT: I later wanted to do the same thing with tasks submitted using the deferred library, and it took a bit of headbanging to figure it, so I'm sharing here to ease other people's pain.

If your taskqueue contains only tasks submitted with deferred, then this will run all of the tasks and any tasks queued by those tasks:

def submit_deferred(taskq):
    tasks = taskq.GetTasks("default")
    taskq.FlushQueue("default")
    while tasks:
        for task in tasks:
            (func, args, opts) = pickle.loads(base64.b64decode(task["body"]))
            func(*args)
        tasks = taskq.GetTasks("default")
        taskq.FlushQueue("default")
new name
  • 15,861
  • 19
  • 68
  • 114
  • Would it be correct to then remove the task from the task queue after it has been executed? Otherwise you are basically just leaving it in there? Is there any way to get it to "auto-remove" after the task has been run, just like it would in production? – Stephen Cagle Aug 31 '11 at 01:51
  • @Stephen, in the second part of my answer, FlushQueue does exactly that. – new name Aug 31 '11 at 16:16
  • 1
    NOTE: calling FlushQueue or DeleteTask will not leave a tombstone... so if you're naming your tasks then your tests may miss any TombstonedTaskError lurking around. I'm calling `taskq._GetGroup().GetQueue('default').Delete(task['name'])` to delete the task from the queue but leave the tombstone – Nicholas Franceschina Jan 15 '16 at 19:51
  • In the second scenario, where is `submit_deferred` called from? Are you actually submitted deferred tasks using it when in the test sandbox or is this wired into your test setup? – pdoherty926 Mar 12 '19 at 15:17
14

Another (cleaner) option to achieve this is to use the task queue stub within the testbed. To do this you first have to initialize the task queue stub by adding the following to your setUp() method:

self.testbed = init_testbed()
self.testbed.init_taskqueue_stub()

The tasks scheduler can be accessed using the following code:

taskq = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

The interface for working with the queue stub is as follows:

GetQueues() #returns a list of dictionaries with information about the available queues

#returns a list of dictionaries with information about the tasks in a given queue
GetTasks(queue_name)

DeleteTask(queue_name, task_name) #removes the task task_name from the given queue

FlushQueue(queue_name) #removes all the tasks from the queue

#returns tasks filtered by name & url pointed to by the task from the given queues
get_filtered_tasks(url, name, queue_names)

StartBackgroundExecution() #Executes the queued tasks

Shutdown() #Requests the task scheduler to shutdown.

Also, as this uses App Engine SDK own facilities - it works just fine with the deferred library.

Drag0nR3b0rn
  • 393
  • 3
  • 8
  • Thanks for the tip. Are you using the same stub but accessing it in a different way? – new name Sep 07 '11 at 18:27
  • Actually `init_taskqueue_stub()` is almost the same as `apiproxy_stub_map.apiproxy.GetStub('taskqueue')`. The first provides some abstraction and validity over the second option - which can be considered a lower level API. You can find the detail by looking on the implementation of the testbed - http://code.google.com/p/googleappengine/source/browse/trunk/python/google/appengine/ext/testbed/__init__.py (the interesting function is `_register_stub` on line 283). – Drag0nR3b0rn Sep 07 '11 at 20:17
  • Is it possible to complete all tasks synchronously ? So call StartBackgroundExecution () would only complete once all queued tasks have been completed ? – user1055761 Aug 12 '17 at 23:38
8

The dev app server is single-threaded, so it can't run tasks in the background while the foreground thread is running the tests.

I modified TaskQueueTestCase in taskqueue.py in gaetestbed to add the following function:

def execute_tasks(self, application):
    """
    Executes all currently queued tasks, and also removes them from the 
    queue.
    The tasks are execute against the provided web application.
    """

    # Set up the application for webtest to use (resetting _app in case a
    # different one has been used before). 
    self._app = None
    self.APPLICATION = application

    # Get all of the tasks, and then clear them.
    tasks = self.get_tasks()
    self.clear_task_queue()

    # Run each of the tasks, checking that they succeeded.
    for task in tasks:
        response = self.post(task['url'], task['params'])
        self.assertOK(response)

For this to work, I also had to change the base class of TaskQueueTestCase from BaseTestCase to WebTestCase.

My tests then do something like this:

# Do something which enqueues a task.

# Check that a task was enqueued, then execute it.
self.assertTrue(len(self.get_tasks()), 1)
self.execute_tasks(some_module.application)

# Now test that the task did what was expected.

This therefore executes the task directly from the foreground unit test. This is not quite the same as in production (ie, the task will get executed 'some time later' on a separate request), but it works well enough for me.

Saxon Druce
  • 17,406
  • 5
  • 50
  • 71
  • Thanks! At this point is it better to use gaetestbed rather than the official testbed? The meager documentation for testbed suggests that gaetestbed has more features. – new name Jul 09 '11 at 13:58
  • Saxon, I was able to do the same thing with the official testbed. Thanks for pointing in the right direction. – new name Jul 09 '11 at 16:13
  • No worries :) I only just saw your question from 2 hours ago - I've only used gaetestbed, so not sure how it compares with testbed. – Saxon Druce Jul 09 '11 at 16:16
4

You might want to try the following code. Full explanation is here: http://www.geewax.org/task-queue-support-in-app-engines-ext-testbed/

import unittest
from google.appengine.api import taskqueue
from google.appengine.ext import testbed


class TaskQueueTestCase(unittest.TestCase):

  def setUp(self):
    self.testbed = testbed.Testbed()
    self.testbed.activate()
    self.testbed.init_taskqueue_stub()
    self.task_queue_stub = self.testbed.get_stub(testbed.TASKQUEUE_SERVICE_NAME)

  def tearDown(self):
    self.testbed.deactivate()

  def testTaskAdded(self):
    taskqueue.add(url='/path/to/task')

    tasks = self.taskqueue_stub.get_filtered_tasks(url='/path/to/task')
    self.assertEqual(1, len(tasks))
    self.assertEqual('/path/to/task', tasks[0].url)

unittest.main()
JJ Geewax
  • 10,342
  • 1
  • 37
  • 49