2

I maintain an index with roughly 500 million documents. Amongst others, each document has a string field that contains between 1 to 10 words. I'd like to analyze this field in each document with regard to its word count and store the result to the respective document into a field "wordCount".

I know that there is the partial_update functionality found here: ES documentation to partial_update

I wonder if a scripted partial_update (maybe with an advanced Groovy script) can be used to significantly increase speed of the above task. And if so, can someone give a hint how to start?

Currently, I am using the below python script, but it's slow as hell (in terms of big data, due to the many network roundtrips and payload sizes)

#!/usr/bin/env python
#-*- coding: utf-8 -*-

import elasticsearch
from elasticsearch import helpers
import pyes
from unidecode import unidecode
from datetime import datetime


def getKeywordLength(text):
    text = text.strip()
    return text.count(" ")+1

indices = ["corpus"]

uri2 = "%s:%d" % ("http://localhost", 9200)
connection2 = pyes.ES([uri2], timeout=2000000)
es = elasticsearch.Elasticsearch(timeout=2000000)

def start():
    elasticSearchIndexName = index

    ###build search query to iterate over all records
    squery ='{"sort": [{"timestampUpdated": {"order": "asc","ignore_unmapped": true}}],"query": {"filtered": {"query": {"bool": {"should": [{"query_string": {"query": "*"}}]}}}}}'

    ###fetch a scrolling handle over all records
    items = helpers.scan(es,query=squery.encode('utf8'),index=elasticSearchIndexName,scroll='360s', size='1000', timeout=2000000)

    ###iterate over all records
    for i in items:
        try:
            indexName = i["_index"]
            timestamp = datetime.now().isoformat()
            keyword = i["_source"]["keyword"]
            i["_source"]["keywordLength"] = getKeywordLength(keyword)
            i["_source"]["timestampUpdated"] =  timestamp
            result = connection2.index(i["_source"], indexName, "items", id=i['_id'])
            print result
        except:
            start()
            return
start()
Jabb
  • 3,414
  • 8
  • 35
  • 58
  • Well, if network latency is the bottle neck, I doubt a Groovy solution would be any faster. Python has some concurrency solutions: https://wiki.python.org/moin/Concurrency/ And Groovy has Gpars: http://www.gpars.org/guide/ Do you think processing the records concurrently will work for you? – Emmanuel Rosa Sep 04 '15 at 16:41
  • using groovy is referred to elasticsearch's scripting engine which runs within elasticsearch. this has the advantage, that documents do not need to be moved back and forth to python or any other script. this is what I meant with "fewer roundtrips and less payload". running python concurrently is possible, but I'd need to come up with additional logic to feed different chunks to each script instance/thread without creating race conditions. – Jabb Sep 04 '15 at 17:30

2 Answers2

2

What I usually do when I have plenty of data to bulk update on millions of documents and can't afford the roundtrip is using the update-by-query plugin. The principle is dead simple, it allows you to run a query with the query DSL and on all the matching documents, run a script to do whatever you like.

In your case, it would go like this:

curl -XPOST localhost:9200/corpus/update_by_query -d '{
    "query": {
        "match_all": {}
    }, 
    "script": "ctx._source.keywordLength = ctx._source.keyword.split(\" \").size() + 1; ctx._source.timestampUpdated = new Date().format(\"yyyy-MM-dd\");"
}'

Also note that in order to be able to run this, you need to enable scripting in your elasticsearch.yml file:

# before ES 1.6
script.disable_dynamic: false

# since ES 1.6
script.inline: on
Val
  • 207,596
  • 13
  • 358
  • 360
  • thanks this works. needed to correct _update_by_query to update_by_query and remove the ' nearby split(\" \"') – Jabb Sep 05 '15 at 07:16
  • `update_by_query` is now included in elasticsearch, versions 2.3 and above. – spazm Oct 05 '16 at 23:52
0

I've only found a sliver of information about the context provided to a Groovy script running within ElasticSearch.

Based on that, here's the Groovy equivalent of setting/updating the two fields:

ctx._source.keywordLength = ctx._source.keyword.split(' ').size()
ctx._source.timestampUpdated = new Date().format('yyyy-MM-dd')

I couldn't figure out how the searching and iteration come into play.

This may also help.

Emmanuel Rosa
  • 9,697
  • 2
  • 14
  • 20