2

I currently have an application running on appengine and I am executing a few jobs using the deferred library, some of these tasks run daily, while some of them are executed once a month. Most of these tasks query Datastore to retrieve documents and then store the entities in an index (Search API). Some of these tables are replaced monthly and I have to run these tasks on all entities (4~5M).

One exemple of such a task is:

def addCompaniesToIndex(cursor=None, n_entities=0, mindate=None):
    #get index
    BATCH_SIZE = 200
    cps, next_cursor, more = Company.query().\
                                     fetch_page(BATCH_SIZE,
                                                start_cursor=cursor)

    doc_list = []

    for i in range(0, len(cps)):
        cp = cps[i]
        #create a Index Document using the Datastore entity
        #this document has only about 5 text fields and one date field
        cp_doc = getCompanyDocument(cp)
        doc_list.append(cp_doc)

    index = search.Index(name='Company')
    index.put(doc_list)

    n_entities += len(doc_list)

    if more:
        logging.debug('Company: %d added to index', n_entities)
        #to_put[:] = []
        doc_list[:] = []
        deferred.defer(addCompaniesToIndex,
                       cursor=next_cursor,
                       n_entities=n_entities,
                       mindate=mindate)
    else:
        logging.debug('Finished Company index creation (%d processed)', n_entities)

When I run one task only, the execution takes around 4-5s per deferred task, so indexing my 5M entities would take about 35 hours.

Another thing is that when I run an update on another index (eg, one of the daily updates) using a different deferred task on the same queue, both are executed a lot slower. And start taking about 10-15 seconds per deferred call which is just unbearable.

My question is: is there a way to do this faster and scale the push queue to more than one job running each time? Or should I use a different approach for this problem?

Thanks in advance,

Dan McGrath
  • 41,220
  • 11
  • 99
  • 130
clds
  • 171
  • 2
  • 13

2 Answers2

2

By placing the if more statement at the end of the addCompaniesToIndex() function you're practically serializing the task execution: the next deferred task is not created until the current deferred task completed indexing its share of docs.

What you could do is move the if more statement right after the Company.query().fetch_page() call where you obtain (most of) the variables needed for the next deferred task execution.

This way the next deferred task would be created and enqueued (long) before the current one completes, so their processing can potentially be overlapping/staggered. You will need some other modifications as well, for example handling the n_entities variable which loses its current meaning in the updated scenario - but that's more or less cosmetic/informational, not essential to the actual doc indexing operation.

If the number of deferred tasks is very high there is a risk of queueing too many of them simultaneously, which could cause an "explosion" in the number of instances GAE would spawn to handle them. In such case is not desired you can "throttle" the rate at which the deferred tasks are spawned by delaying their execution a bit, see https://stackoverflow.com/a/38958475/4495081.

Community
  • 1
  • 1
Dan Cornilescu
  • 39,470
  • 12
  • 57
  • 97
  • Hi Dan, I applied your ideas to my code, but I have the impression that the reading from Datastore is actually a lot more expensive than inserting these entities in the index and therefore the throughput gain is not as great as I was expecting, I suppose that minimizing the size of the read operation might help, will do a few more test and get back to you. – clds Dec 05 '16 at 14:29
  • In such case it's probably better to do keys_only queries instead, then, after enqueuing the next task, assemble the list of keys for the page and perform batch reads for them to get the docs and update the index. – Dan Cornilescu Dec 05 '16 at 14:51
  • BTW - you can actually check your suspicion about the cost of the datastore reads: check your app's logs in the dev console - some of the log entries have blue links in the request duration column - click on the links and then "View trace" in the popup menu and you'll see appstats-like traces in StackDriver, so you can get a better idea of where the time is spent for that particular request. – Dan Cornilescu Dec 05 '16 at 15:00
  • Thanks Dan, I just did the tracing for the times but got this: http://imgur.com/ZLmkBm0 does that mean that the task is idle for most of the time? – clds Dec 05 '16 at 15:37
  • Not idle - that's just your actual app code running (other than RPC) - appstats only traces the RPC calls themselves (the green bars). I'm not sure if that includes the actual wait for and transfer of the RPC calls' results to/from your code or not. I suspect it doesn't as there really isn't much other code in your app to explain the relatively wide intervals between RPC calls. If the total size of your `Company` entities is larger than the length of their keys then the `keys_only` queries technique I mentioned should help. – Dan Cornilescu Dec 05 '16 at 18:09
  • Thanks again Dan, so just did some changes in the code to use the `keys_only` option on the fetch and after launching another `deferred` job do a `ndb.get_multi`, I was kind of expecting it to get a bit slower, but now each call is taking around 30s: http://imgur.com/XE9wVTL and the number of tasks running in parallel in the push queue hasn't really increased. – clds Dec 05 '16 at 19:48
  • I just did some tracing on the code using time.time() to understand better what is happening. It seems like once multiple tasks are running in parallel on the push queue the access time for datastore goes up tenfold or so. – clds Dec 07 '16 at 19:12
  • 1
    I think it could be related to this issue: http://stackoverflow.com/questions/25796142/appengine-query-fetch-async-not-very-asynchronous – clds Dec 07 '16 at 19:35
1

I think I finally managed to get around this issue by using two queues and idea proposed by the previous answer.

  • On the first queue we only query the main entities (with keys_only). And launch another task on a second queue for those keys. The first task will then relaunch itself on queue 1 with the next_cursor.
  • The second queue gets the entity keys and does all the queries and inserts on Full text search/BigQuery/PubSub. (this is slow ~ 15s per group of 100 keys)

I tried using only one queue as well but the processing throughput was not as good. I believe that this might come from the fact that we have slow and fast tasks running on the same queue and the scheduler might not work as well in this case.

clds
  • 171
  • 2
  • 13