0

I need to come up with a strategy to process and update documents in an elasticsearch index periodically and efficiently. I do not have to look at documents that I processed before.

My setting is that I have a long running process, which continuously inserts documents to an index, say approx. 500 documents per hour (think about the common logging example).

I need to find a solution to update some amount of documents periodically (via cron job, e.g) to run some code on a specific field (text field, eg.) to enhance that document with a number of new fields. I want to do this to offer more fine grained aggregations on the index. In the logging analogy, this could be, e.g., I get the UserAgent-string from a log entry (document), do some parsing on that, and add some new fields back to that document and index it.

So my approach would be:

  1. Get some amount of documents (or even all) that I haven't looked at before. I could query them by combining must_not and exists, for instance.
  2. Run my code on these documents (run the parser, compute some new stuff, whatever).
  3. Update the documents obtained previously (probably most preferably via bulk api).

I know there is the Update by query API. But this does not seem to be right here, since I need to run my own code (which btw depends on external libraries), on my server and not as a painless script, which would not offer that comprehensive tasks I need.

I am accessing elasticsearch via python.

The problem is now that I don't know how to implement the above approach. E.g. what if the amount of document obtained in step 1. is larger than myindex.settings.index.max_result_window?

Any ideas?

matthaeus
  • 797
  • 2
  • 7
  • 17
  • The approach looks fine, for 1. you can use the elasticsearch `scroll` API, here's python abstraction of it - https://elasticsearch-py.readthedocs.io/en/master/helpers.html#scan - can allow you to fetch millions of docs matching your query in batches. In your bulk update query though, you will have to ensure that no. of docs being updated are less than max limit. (if the changes are across different fields with different values, you might have to do an update per doc) – Jay Jan 10 '22 at 18:37
  • Thank you @Jay. I had a look at the api and [the ES documentation of scoll api](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/paginate-search-results.html#scroll-search-results). I think this could work for me for the moment. However, the documentation states that they do "[no longer recommend using the scroll API for deep pagination. If you need to preserve the index state while paging use .. point in time (PIT)](https://www.elastic.co/guide/en/elasticsearch/reference/7.16/paginate-search-results.html#scroll-search-results)" – matthaeus Jan 10 '22 at 22:41
  • If you don't care about state when you first query, then you can use the PIT - https://stackoverflow.com/questions/59105657/elasticsearch-pagination – Jay Jan 11 '22 at 04:40

1 Answers1

1

I considered @Jay's comment and ended up with this pattern, for the moment:

from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from elasticsearch.helpers import scan

from my_module.postprocessing import post_process_doc

es = Elasticsearch(...)
es.ping()

def update_docs( docs ):
    """"""
    for idx,doc in enumerate(docs):
        if idx % 10000 == 0:
            print( 'next 10k' )
        
        new_field_value = post_process_doc( doc )

        doc_update = {
            "_index": doc["_index"],
            "_id" : doc["_id"],
            "_op_type" : "update",
            "doc" : { <<the new field>> : new_field_value }
        }

        yield doc_update

docs = scan( es, query='{ "query" : { "bool": { "must_not": { "exists": { "field": <<the new field>> }} } }}', index=index, scroll="1m", preserve_order=True )

bulk( es, update_docs( docs ) )

Comments:

matthaeus
  • 797
  • 2
  • 7
  • 17