1

Note: This kind of query has been asked previously 2 or 3 times years ago but did not have any satisfactory answer. I am posting my specific problem here. Hope, someone suggests some good solution.

I am facing a challenge fetching desired records from elasticsearch. We strictly need filtering on the results returned by TOP aggregation. Anyway, below is my scenario:

Given: We have an entity named "service" which have attributes like below:

{
    "id": "servicer-id-1",
    "status": "OPEN",         // These may be CLOSED, RESOLVED
    "timeRaised": "2019-03-21T15:09:17.015Z",
    "timeChanged": "2019-03-21T15:09:17.015Z"
}

I have an elastic index where any change in the above service is stored as a whole service document(a kind of history of service). There are more than one service with same id. We update timeChanges attribute everytime.

There are millions of service documents in the index.

Problem Statement: We need particular services which were the latest state during a given time frame(timeChanged) and status OPEN.

What I did: I used below query with the scroll API with 10000 bacth size to resolve our problem:

{
  "size" : 1000,   //given by user
  "query" : {
    "constant_score" : {
      "filter" : {
        "bool" : {
          "must" : [
            {
              "range" : {
                "timeChanged" : {
                  "from" : 1552940830000,
                  "to" : 1553498830000,
                  "include_lower" : true,
                  "include_upper" : true,
                  "boost" : 1.0
                }
              }
            }
          ],
          "disable_coord" : false,
          "adjust_pure_negative" : true,
          "boost" : 1.0
        }
      },
      "boost" : 1.0
    }
  },
  "post_filter": {
    "bool": {
        "must": [{
            {
                "constant_score": {
                    "filter": {
                        "terms": {
                            "status": ["OPEN"],
                            "boost": 1.0
                        }
                    },
                    "boost": 1.0
                }
            }
        }],
      "disable_coord" : false,
      "adjust_pure_negative" : true,
      "boost" : 1.0
    }
  },
  "_source" : false,
  "aggregations" : {
    "by_serviceId" : {
      "terms" : {
        "field" : "id",
        "size" : 50000,        // we set it with total number of services exist
        "min_doc_count" : 1,
        "shard_min_doc_count" : 0,
        "show_term_doc_count_error" : false,
        "order" : [
          {
            "_count" : "desc"
          },
          {
            "_term" : "asc"
          }
        ]
      },
      "aggregations" : {
        "top" : {
          "top_hits" : {
            "from" : 0,
            "size" : 1,
            "version" : false,
            "explain" : false,
            "sort" : [
              {
                "timeChanged" : {
                  "order" : "desc"
                }
              }
            ]
          }
        }
      }
    }
  }
}

From above query, we are getting aggregation from first hit of scroll which are the list of latest state of service in aggregation. And by Post filter we are fetching OPEN service in batches of 10000 and try to match the ids(by java code) with aggregation list to find out our candidate.

It is taking too much time to return the desired output. Around 8 mins for 4.4M records in the index.

This problem can be solved if you suggest a way to put filter on returned aggregated data. But after searching so many places, I found out that it is not supported in elastic. Is it so? Ref of same problem:

Elasticsearch: filter top hits aggregation

Elasticsearch exclude top hit on field value

Please help and suggest better way to fulfill the scenario.

Thanks.

Disclaimer: Please do not suggest to apply query and then aggregation because it won't solve the problem. e.g. If I filter on OPEN status first and then aggregate so, for a given date I always get OPEN service but in reality for a given day, service might be RESOLVED.

Moksh
  • 185
  • 1
  • 6
  • 16

1 Answers1

1

here is my attempt to fulfill your need. I have a proof of concept aggregation by it cant work with a string status. So we need first to translate the string status to a number ( maybe an update by query could do the job for you)

In my example

OPEN => status_number = 1 
CLOSED => status_number = 2 
RESOLVED => status_number = 3 

And here is my 50 cents request :D

POST <index>/doc/_search
{
  "size": 0,
  "query": {
    "bool": {
      "filter": {
        "range": {
          "timeChanged": {
            "gte": "2019-03-21T15:09:17.015Z",
            "lte": "2019-03-21T15:09:18.015Z"
          }
        }
      }
    }
  },
  "aggs": {
    "service": {
      "terms": {
        "field": "id.keyword",
        "size": 10
      },
      "aggs": {
        "last_status": {
          "terms": {
            "field": "status_number",
            "size": 1,
            "order": {
              "last_change": "desc" // order to keep the last status of the timespan with the size of 1
            }
          },
          "aggs": {
            "last_change": {
              "max": {
                "field": "timeChanged"
              }
            }
          }
        },
        "min_status": {
          "min_bucket": {
            "buckets_path": "last_status._key" // used to transforms a bucket list in a single value for the bucket_selector beneath
          }
        },
        "filtered": {
          "bucket_selector": {
            "buckets_path": {
              "key": ">min_status"
            },
            "script": """
              params.key == 1 // filter buckets where last status_number is 1 si status = OPEN
            """
          }
        }
      }
    }
  }
}

The output is quite verbose :

{
  "took": 2,
  "timed_out": false,
  "_shards": {
    "total": 5,
    "successful": 5,
    "skipped": 0,
    "failed": 0
  },
  "hits": {
    "total": 6,
    "max_score": 0,
    "hits": []
  },
  "aggregations": {
    "service": {
      "doc_count_error_upper_bound": 0,
      "sum_other_doc_count": 0,
      "buckets": [
        {
          "key": "servicer-id-4",
          "doc_count": 1,
          "last_status": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
              {
                "key": 1,
                "doc_count": 1,
                "last_change": {
                  "value": 1553180958015,
                  "value_as_string": "2019-03-21T15:09:18.015Z"
                }
              }
            ]
          },
          "min_status": {
            "value": 1,
            "keys": [
              "1"
            ]
          }
        }
      ]
    }
  }
}

But you just need the aggregations.service.buckets.key values

I hope it can help you but of course without data i cant evaluate the performance of this query.

Pierre Mallet
  • 7,053
  • 2
  • 19
  • 30
  • Thanks for your interest and effort. I analysed and thought of trying your suggestion but I am not able to meet integer constraint with that update by query and also there are other fields besides status used by user which are also String type(to make question straight, I did not mention). Do you have other thoughts by the way? – Moksh Jun 19 '19 at 16:54