2

I'm currently attempting to migrate our elasticsearch data to being 2.0 compatible (ie: no dots in field names) in preparation for an upgrade form 1.x to 2.x.

I've written a program that runs through the data (in batches) that sits in a one-node cluster, and renames the fields, re-indexing the documents using the Bulk API.

At some point it all goes wrong, and the total number of documents coming back from my query (to be "ugpraded") doesn't change, even though it should be counting down.

Initially I thought that it wasn't working. When I pick a document and query for it to see if it's changing, I can see that it is working.

However, when I query documents for a specific field within that document I get two results with the same ID. One of the results has the upgraded field, the other one does not.

On further inspection I can see that they come from different shards:

{
  "took" : 2,
  "timed_out" : false,
  "_shards" : {
    "total" : 5,
    "successful" : 5,
    "failed" : 0
  },
  "hits" : {
    "total" : 2,
    "max_score" : 19.059433,
    "hits" : [ {
      "_shard" : 0,
      "_node" : "FxbpjCyQRzKfA9QvBbSsmA",
      "_index" : "status",
      "_type" : "status",
      "_id" : "http://static.photosite.com/80018335.jpg",
      "_version" : 2,
      "_score" : 19.059433,
      "_source":{"url":"http://static.photosite.com/80018335.jpg","metadata":{"url.path":["http://www.photosite.com/80018335"],"source":["http://www.photosite.com/80018335"],"longitude":["104.507755"],"latitude":["21.601669"]}},
      ...
    }, {
      "_shard" : 3,
      "_node" : "FxbpjCyQRzKfA9QvBbSsmA",
      "_index" : "status",
      "_type" : "status",
      "_id" : "http://static.photosite.com/80018335.jpg",
      "_version" : 27,
      "_score" : 17.607681,
      "_source":{"url":"http://static.photosite.com/80018335.jpg","metadata":{"url_path":["http://www.photosite.com/80018335"],"source":["http://www.photosite.com/80018335"],"longitude":["104.507755"],"latitude":["21.601669"]}},
      ...      
  }
}

How can I prevent this from happening?

elasticsearch version: 1.7.3

query:

{
  "bool" : {
    "must" : {
      "wildcard" : {
        "metadata.url.path" : "*"
      }
    },
    "must_not" : {
      "wildcard" : {
        "metadata.url_path" : "*"
      }
    }
  }
}

Code to write the document:

        BulkRequestBuilder bulkRequest = destinationConnection.getClient().prepareBulk();
        for(Map<String, Object> doc : batch.getDocs()){
            XContentBuilder builder;
            try {
                builder = XContentFactory.jsonBuilder().startObject();
                for(Map.Entry<String, Object> mapEntry : doc.entrySet()){
                    if(!mapEntry.getKey().equals("id")){
                        builder.field(mapEntry.getKey(), mapEntry.getValue());
                    }
                }
                builder.endObject();
            } catch (IOException e) {
                throw new DocumentBuilderException("Error building request to move items to new parent!", e);
            }

            bulkRequest.add(destinationConnection.getClient().prepareIndex(destinationIndex, destinationType, (String) doc.get("id")).setSource(builder).request());

        }
        // Tried with and without setRefresh
        BulkResponse response = bulkRequest.setRefresh(true).execute().actionGet();
        for(BulkItemResponse itemResponse : response.getItems()){
            if(itemResponse.isFailed()){
                LOG.error("Updating item: {} failed: {}", itemResponse.getFailure().getId(), itemResponse.getFailureMessage());
            }
        }

Update
Could it be refresh/query speed?

The program is set to process 5000 document-batches, and is not using a scroll query, so I'd be expecting the total number of results coming back from that query to be reduced by 5000 every iteration.

In actual fact this is not happening. The amount of documents removed from the total result set each iteration reduces and reduces until eventually it's the same every iteration:

10:43:42.220  INFO : Fetching another batch
10:43:51.701  INFO : Found 9260992 matching documents. Processing 5000...
10:43:51.794  INFO : Total remaining: 9260992
10:43:51.813  INFO : Writing batch of 5000 items
10:43:57.261  INFO : Fetching another batch
10:44:06.136  INFO : Found 9258661 matching documents. Processing 5000...
10:44:06.154  INFO : Total remaining: 9258661
10:44:06.158  INFO : Writing batch of 5000 items
10:44:11.369  INFO : Fetching another batch
10:44:19.790  INFO : Found 9256813 matching documents. Processing 5000...
10:44:19.804  INFO : Total remaining: 9256813
10:44:19.807  INFO : Writing batch of 5000 items
10:44:22.684  INFO : Fetching another batch
10:44:31.182  INFO : Found 9255697 matching documents. Processing 5000...
10:44:31.193  INFO : Total remaining: 9255697
10:44:31.196  INFO : Writing batch of 5000 items
10:44:33.852  INFO : Fetching another batch
10:44:42.394  INFO : Found 9255115 matching documents. Processing 5000...
10:44:42.406  INFO : Total remaining: 9255115
10:44:42.409  INFO : Writing batch of 5000 items
10:44:45.152  INFO : Fetching another batch
10:44:51.473  INFO : Found 9254744 matching documents. Processing 5000...
10:44:51.483  INFO : Total remaining: 9254744
10:44:51.486  INFO : Writing batch of 5000 items
10:44:53.853  INFO : Fetching another batch
10:44:59.966  INFO : Found 9254551 matching documents. Processing 5000...
10:44:59.978  INFO : Total remaining: 9254551
10:44:59.981  INFO : Writing batch of 5000 items
10:45:02.446  INFO : Fetching another batch
10:45:07.773  INFO : Found 9254445 matching documents. Processing 5000...
10:45:07.787  INFO : Total remaining: 9254445
10:45:07.791  INFO : Writing batch of 5000 items
10:45:10.237  INFO : Fetching another batch
10:45:15.679  INFO : Found 9254384 matching documents. Processing 5000...
10:45:15.703  INFO : Total remaining: 9254384
10:45:15.712  INFO : Writing batch of 5000 items
10:45:18.078  INFO : Fetching another batch
10:45:23.660  INFO : Found 9254359 matching documents. Processing 5000...
10:45:23.712  INFO : Total remaining: 9254359
10:45:23.725  INFO : Writing batch of 5000 items
10:45:26.520  INFO : Fetching another batch
10:45:31.895  INFO : Found 9254343 matching documents. Processing 5000...
10:45:31.905  INFO : Total remaining: 9254343
10:45:31.908  INFO : Writing batch of 5000 items
10:45:34.279  INFO : Fetching another batch
10:45:40.121  INFO : Found 9254333 matching documents. Processing 5000...
10:45:40.136  INFO : Total remaining: 9254333
10:45:40.139  INFO : Writing batch of 5000 items
10:45:42.381  INFO : Fetching another batch
10:45:47.798  INFO : Found 9254325 matching documents. Processing 5000...
10:45:47.823  INFO : Total remaining: 9254325
10:45:47.833  INFO : Writing batch of 5000 items
10:45:50.370  INFO : Fetching another batch
10:45:57.105  INFO : Found 9254321 matching documents. Processing 5000...
10:45:57.117  INFO : Total remaining: 9254321
10:45:57.121  INFO : Writing batch of 5000 items
10:45:59.459  INFO : Fetching another batch

It looks as though document duplication is rife from the outset.

I've just tried a two-node cluster with cluster health status: green, and the same thing happens.

I'm going to try a single node with no replication next.

Update:
Here is an example before/after bulk processor listener data:

Before:

Item( id=http://static.photosite.com/20160123_093502.jpg, index=status, type=status, op_type=INDEX, version=-3, parent=null, routing=null )

After (BulkResponse indicated no failures):

Item( id=http://static.photosite.com/20160123_093502.jpg, index=status, type=status, op_type=index, version=22)

Things of note:

  1. No parent
  2. No routing
  3. Massive jump in document version

Also not made clear by this snippet is that each item in the beforeBulk request is represented as a successful IndexRequest in the afterBulk request details (ie: none are missing).

Update 2

I think that the initial negative version might have something to do with it: https://discuss.elastic.co/t/negative-version-number-on-snapshot-restore-from-s3-bucket/56642

Update 3

I've just discovered that when I query the documents using curl the versions are positive, ie:

  1. Restore snapshot.
  2. Query for document using curl, version is 2
  3. Query for document using java API, version is -1
  4. Re-indexing the document causes a duplicate (new document with the same ID written to a different shard) with a version of 1.

What's happening here?

ndtreviv
  • 3,473
  • 1
  • 31
  • 45
  • This looks weird and impossible... what ES version and what was the query that returned that? – Andrei Stefan Jul 27 '16 at 18:13
  • @AndreiStefan I know, right?! I've updated the question with the requested details. – ndtreviv Jul 28 '16 at 08:57
  • could you register a bulk processor listener and log the beforeBulk and afterBulk info? – geneqew Jul 28 '16 at 10:03
  • @geneqew I'm not sure how to register the listener doing it the way I'm doing it...? Which raises the question - am I doing the bulk request in the best way? – ndtreviv Jul 28 '16 at 10:27
  • Actually, hang fire. I'll make some changes. – ndtreviv Jul 28 '16 at 10:30
  • You may use this as a reference (i'm using spring factory bean), here is my [bulkProcessor](https://gist.github.com/geneqew/9c023d41f330a9e742ccef10c1b58eb5) , with the [listener](https://gist.github.com/geneqew/ce1679a4937c4e388b3da440ea1f5a12). Then to use it similar to your code... from there could you check the logs? – geneqew Jul 28 '16 at 10:51
  • @geneqew Any ideas? – ndtreviv Jul 28 '16 at 14:31
  • Can you take these two documents and try and index them in a new cluster? If you index twice and get 2 documents it looks like ES bug, if you index the 2 documents and receive one there is a problem with your cluster setup. Seems like the documents are on different shards, it may be because you tried manual routing. – raam86 Jul 29 '16 at 09:07
  • Again, there is no manual routing...wait... garr! – ndtreviv Jul 29 '16 at 09:40

2 Answers2

5

Executive Summary:
I am an idiot.

Details:
I started today by learning how elasticsearch routes documents to shards.

It turns out that it uses the following forumula: shard = hash(routing) % number_of_primary_shards

By default, routing is the _id of the document, unless you override that when indexing.

Everyone was mentioning that I was doing routing, but I was adamant that I was not. And that was the problem!!!

I had restored a snapshot of data. The data in the index I was attempting to upgrade was originally written by a program called stormcrawler.

stormcrawler does use routing to index these documents, but because I wasn't using routing to re-index them, it was creating apparent duplicates on different shards.

Once again, elasticsearch rules and I suck.

Sorry for everyone whose time I wasted on this. I'm now going to lie down in a dark room and cry.

ndtreviv
  • 3,473
  • 1
  • 31
  • 45
0

For the next person that ends up here. I ran into this issue when inheriting an elasticsearch-1.2.1 instance. Copying the /var/lib/elasticsearch files is not enough. To fix the issue where multiple records have the same ID I did the following

  • Export all of the records using elasticdump

    elasticdump --input='http://localhost:9200/your-index' --output=your-index.json

  • Delete all records in the index (it's important not to delete the index itself because you don't want to rebuild the metadata)

    curl -XDELETE localhost:9200/your_index/_query?q='*'

  • Import all of the records using elasticdump

    elasticdump --input=./your-index.json --output=http://localhost:9200/your-index

This removed duplicates for me and resolved the sharding issue

Additionally you can identify if its a sharding issue by adding a preference parameter to your search and restricting it to particular shards.

Tareq Saif
  • 19
  • 2