6

What would be the best way to query Elasticsearch in order to implement a date histogram representing the total number of unique visitors metric?

Considering the following data:

PUT /events
{
"mappings" : {
        "_doc" : {
            "properties" : {
                "userId" : { "type" : "keyword" },
                "eventDate" : { "type" : "date" }
            }
        }
    }
}

POST /events/_bulk
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "1" } }
{"userId": "1","eventDate": "2019-03-04T13:40:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "2" } }
{"userId": "2","eventDate": "2019-03-04T13:46:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "3" } }
{"userId": "3","eventDate": "2019-03-04T13:50:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "4" } }
{"userId": "1","eventDate": "2019-03-05T13:46:18.514Z"}
{ "index" : { "_index" : "events", "_type" : "_doc", "_id" : "5" } }
{"userId": "4","eventDate": "2019-03-05T13:46:18.514Z"}

Now, if I query the cardinality of the userId field I get the 4 distinct visitors.

POST /events/_search
{
    "size": 0,
    "aggs": {
        "visitors": {
            "cardinality": {
                "field": "userId"
            }
        }
    }
}

However, distributing the documents over a date histogram, I get a total sum of 5 because there's a repeated userId in both buckets.

POST /events/_search
{
    "size": 0,
    "aggs": {
        "visits_over_time": {
            "date_histogram": {
                "field": "eventDate",
                "interval": "1d"
            },
            "aggs": {
                "visitors": {
                    "cardinality": {
                        "field": "userId"
                    }
                }
            }
        }
    }
}

Is there a way to filter out those repeated values? What would be the best way to accomplish this?

AndreLDM
  • 2,117
  • 1
  • 18
  • 27

3 Answers3

6

We faced the same issue in our code and our solution was to use a Terms Aggregation on the UserId Field with a nested Min Aggregation on the datetime field. This provides you with a bucket for each userId containing the Bucket with the first visit. We do this aggregation outside of the date histogram and map it manually afterwards.

"aggs": {
    "UniqueUsers": {
      "terms": {
        "field": "userId",
        "size": 1000,
      }, "aggs": {
        "FirstSeen": {
          "min": {
            "field": "date"
          }
        }
      }
    }
  }

This works for us, but i am sure there should be a better implementation.

aHochstein
  • 485
  • 2
  • 10
  • Do you mean to first get all userIds/dates and then on the client's memory aggregate them into a histogram? – AndreLDM Mar 15 '19 at 17:44
  • kinda, this will provide you with a bucket for every user, containing a metric bucket of the first reading, you only have to map the reading then into the histogram. – aHochstein Mar 18 '19 at 09:04
1

The user ids are repeated but they occur on different days so distributing them by days will make it occur more than once unless you are looking at a specific day. Even then, if the same id occurs on the same day more than once, you may still have duplicate ids depending on how precise of a time frame you are looking at. Since you are looking at one day intervals, it is correct that it returns 5 records and should say that on the 4th, there were 3 ids one of which is the duplicate and on the next day shows two records with two different ids one of which is the duplicate. If you increase the interval to a week or a month, those duplicates will be counted as one.

I'm sure you have come across this but give it another look as it is explaining your exact use case. Link

Basically, it is returning all the unique visitors on a given day. If you don't care about individual users but just want to know how many then you need a different approach. Perhaps a group by query

RisingSun
  • 1,693
  • 27
  • 45
  • Exactly, that 's whole point, I need user ids to be unique across all the selected range, not only on a single buckets (I also have day-long and week-long buckets in 7, 30 and 90 days ranges). The query provided on the question is the same from the link you posted, but similarly that query does not accomplish want I need, each color is counted once for each month, but it's not unique across all buckets. Can you provide any sample on how to solve this with the group by query you mentioned? – AndreLDM Mar 15 '19 at 17:57
  • What exactly is your use case? I assumed you wanted to show unique visitors given an interval of time. If so, what you have already works. The above query gives you unique visitors on a daily basis. If you want unique visitors during a 7 day interval, you would change the `1d` to `1w`. Then that would give you unique visitors on a weekly basis. Perhaps you need to explain your use case more. – RisingSun Mar 15 '19 at 19:07
  • Focus on the sample case, forget about the bucket sizes I mentioned. Imagine that all documents on the index are from the same month, what I need is very simple: a date histogram with 1 day interval (day-long buckets) of unique visitors of the whole month, only the first event should be counted. – AndreLDM Mar 15 '19 at 19:46
0

Even though I would like to avoid scripts, Scripted Metric Aggregation seems to be the only way to accomplish what was requested:

{
    "size": 0,
    "aggs": {
        "visitors": {
            "scripted_metric": {
                "init_script": "params._agg.dateMap = new HashMap();",
                "map_script": "params._agg.dateMap.merge(doc.userId[0].toString(), doc.eventDate.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2);",
                "combine_script": "return params._agg.dateMap;",
                "reduce_script": "def dateMap = new HashMap(); for (map in params._aggs) { if (map == null) continue; for (entry in map.entrySet()) dateMap.merge(entry.key, entry.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2); } def hist = new TreeMap(); for (entry in dateMap.entrySet()) hist.merge(entry.value.toString(), 1, (a, b) -> a + 1); return hist;"
            }
        }
    }
}

Init just creates an empty HashMap, Map fills that map with userId as the key and sets the oldest eventDate as the value, and Combine just unwraps the map to be passed to Reduce:

def dateMap = new HashMap();
for (map in params._aggs) {
    if (map == null) continue;
    for (entry in map.entrySet())
        dateMap.merge(entry.key, entry.value, (e1, e2) -> e1.isBefore(e2) ? e1 : e2);
}

def hist = new TreeMap();
for (entry in dateMap.entrySet())
    hist.merge(entry.value.toString(), 1, (a, b) -> a + 1);
return hist;

Up to Combine the code was executed for each cluster node, Reduce merges all maps into one (i.e. dateMap) preserving the oldest eventDate per userId. Then it counts the occurrences of each eventDate.

The result is:

"aggregations": {
    "visitors": {
        "value": {
            "2019-03-04T13:40:18.514Z": 1,
            "2019-03-04T13:46:18.514Z": 1,
            "2019-03-04T13:50:18.514Z": 1,
            "2019-03-05T13:46:18.514Z": 1
        }
    }
}

The only missing part is that those values have to be grouped into a histogram on application code.

Note¹: Use at your own risk, I don't know if memory consumption increases much because of those hash maps or how well it performs on large datasets.

Note²: starting from Elasticsearch 6.4 state and states should be used instead of params._agg and params._aggs.

AndreLDM
  • 2,117
  • 1
  • 18
  • 27