1

I recently added a GeoIP processor to my ingestion pipeline in Elasticsearch. this works well and adds new fields to the newly ingested documents. I wanted to add the GeoIP fields to older data by doing an _update_by_query on an index, however, it seems that it doesn't accept "processors" as a parameter.

What I want to do is something like this:

POST my_index*/_update_by_query
{
 "refresh": true,
 "processors": [
   {
     "geoip" : {
        "field": "doc['client_ip']",
        "target_field" : "geo",
        "database_file" : "GeoLite2-City.mmdb",
        "properties":["continent_name", "country_iso_code", "country_name", "city_name", "timezone", "location"]
    }
   }
 ],
 "script": {
  "day_of_week": {
    "type": "long",
    "script": "emit(doc['@timestamp'].value.withZoneSameInstant(ZoneId.of(doc['geo.timezone'])).getDayOfWeek().getValue())"
  },
  "hour_of_day": {
    "type": "long",
    "script": "emit(doc['@timestamp'].value.withZoneSameInstant(ZoneId.of(doc['geo.timezone'])).getHour())"
  },
  "office_hours": {
    "script": "if (doc['day_of_week'].value< 6 && doc['day_of_week'].value > 0) {if (doc['hour_of_day'].value> 7 && doc['hour_of_day'].value<19) {return 1;} else {return -1;} } else {return -1;}"
  }
 }
}

I receive the following error:

{
  "error" : {
    "root_cause" : [
      {
        "type" : "parse_exception",
        "reason" : "Expected one of [source] or [id] fields, but found none"
      }
    ],
    "type" : "parse_exception",
    "reason" : "Expected one of [source] or [id] fields, but found none"
  },
  "status" : 400
}
Saba Far
  • 133
  • 2
  • 9

1 Answers1

1

Since you have the ingestion pipeline ready, you simply need to reference it in your call to the _update_by_query endpoint, like this:

POST my_index*/_update_by_query?pipeline=my-pipeline
                                    ^
                                    |
                                 add this
Val
  • 207,596
  • 13
  • 358
  • 360
  • I didn't get any errors when I ran it, but I also don't see the fields populated with older data in Kibana. I have added the the fields to the mapping too, but no success so far. – Saba Far Nov 18 '21 at 15:39
  • Are you sure those documents have the `client_ip` field and it is a public IP address? – Val Nov 18 '21 at 16:07
  • They do. I think this is what happened: I had implemented the new geoip and script processors in an older ingestion pipeline and that was running fine for the newly ingested data. But For the old data, to avoid re-running of the old processors in that older pipeline, I put the new processors in a new ingestion pipeline and ran the update_by_query on that new pipeline. The problem was that, I think, this new pipeline is not routed to from the logstash. So it had no effect. I ran the update_by_query on the old pipeline and now the new fields have populated. Thank you for your help. – Saba Far Nov 18 '21 at 17:13