I am using Logstash together with logstash-input-jdbc in order to ship data to Elasticsearch. I'm running it as a service on Windows machine using NSSM. This is config
file (I've cut some repetitive code from filters):
input
{
jdbc
{
jdbc_driver_library => "C:\Logstash\lib\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://myServer;databaseName=myDatabase;user=myUser;password=myPassword;"
jdbc_user => "myUser"
jdbc_password => "myPassword"
statement => "EXECUTE dbo.MyLogstashProcedure;"
schedule => "* * * * *"
}
}
filter
{
ruby {
code => "
hash = event.to_hash
hash.each do |k,v|
if v == nil
event.remove(k)
end
end"
}
#Format RelatedData XMLs
xml
{
source => "relatedproducts"
target => "parsed_relatedproducts"
}
xml
{
source => "relatedcountries"
target => "parsed_relatedcountries"
}
xml
{
source => "reportpermissions"
target => "parsed_reportpermissions"
}
# Format RelatedProducts XML into objects
if [relatedproducts]
{
ruby
{
code => "
event['parsed_relatedproducts']['RelatedProducts'].each do |x|
x.each do |key, value|
if value.count == 1
x[key] = value[0]
end
end
end"
}
}
# Format RelatedCountries XML into objects
if [relatedcountries]
{
ruby
{
code => "
event['parsed_relatedcountries']['RelatedCountries'].each do |x|
x.each do |key, value|
if value.count == 1
x[key] = value[0]
end
end
end"
}
}
# Rename fields back to their real names. JDBC Driver auto-lowercases them and Elasticsearch is case-sensitive
mutate
{
rename =>
{
"itemsourceid" => "ItemSourceID"
"itemtypeid" => "ItemTypeID"
"title" => "Title"
"datepublished" => "DatePublished"
"[parsed_relatedproducts][RelatedProducts]" => "[RelatedProducts]"
"[parsed_relatedcountries][RelatedCountries]" => "[RelatedCountries]"
"[parsed_reportpermissions][ReportPermissions]" => "[ReportPermissions]"
}
remove_field =>
[
"@version"
, "@timestamp"
, "relatedproducts"
, "relatedcountries"
, "reportpermissions"
, "parsed_relatedproducts"
, "parsed_relatedcountries"
, "parsed_reportpermissions"
]
}
}
output {
#stdout { codec => rubydebug }
elasticsearch
{
host => "localhost"
protocol => "http"
action => "index"
index => "myIndex"
document_type => "myType"
document_id => "%{documentId}"
}
}
So as per my understanding:
- Every minute Logstash will execute
EXECUTE dbo.MyLogstashProcedure;
- It will do the following ETL:
- Remove fields, which are
nil
- Parse
XML
columns into sub-documents - Rename fields back to their real names (JDBC driver lowercase column names)
- Remove fields, which are
- It will index data to Elasticsearch cluster
ETL'ed document looks like this. Also, sometimes ReportsPermissions array can be huge and store up 20 thousands of records.
{
"RelatedCountries": [
{
"CountryCode": "MX",
"CountryName": "Mexico",
"CountryPermissions": ["4", "122", "11"]
}
],
"RelatedProducts": [
{
"ProductID": "1",
"ProductName": "Packaged Food"
},
{
"ProductID": "2",
"ProductName": "Packaged Food",
"ProductPermissions": ["19", "29", "30", "469"]
}
],
"Title": "Packaged Food in Mexico",
"ItemTypeID": "3",
"ItemSourceID": "2",
"DatePublished": "2014-11-27T00:00:00",
"ReportPermissions": ["p19c4", "p19c11", "p19c122", "p29c4", "p29c11", "p29c122", "p30c4", "p30c11", "p30c122", "p281c4", "p281c11", "p281c122", "p285c4", "p285c11", "p285c122", "p286c4", "p286c11", "p286c122", "p292c4", "p292c11", "p292c122", "p294c4", "p294c11", "p294c122", "p295c4", "p295c11", "p295c122", "p297c4", "p297c11", "p297c122", "p298c4", "p298c11", "p298c122", "p299c4", "p299c11", "p299c122", "p469c11", "p515c4", "p515c11", "p515c122", "p516c4", "p516c11", "p516c122", "p517c4", "p517c11", "p517c122", "p518c4", "p518c11", "p518c122", "p519c4", "p519c11", "p519c122", "p520c4", "p520c11", "p520c122", "p521c4", "p521c11", "p521c122"]
}
My Stored Procedure execution time strongly depends on data. Sometimes it might execute in 10-15 seconds, sometimes in around 40.
Data is indexed fine (every minute) if Stored Procedure brings results back in around 10-15 seconds. Now when it takes longer to execute, I notice that Logstash slows down and starts shipping data every couple of minutes or even completely stops doing that.
Nor stdout
neither stderr
show any errors. There's nothing in Elasticsearch logs as well.
This runs on Rackspace Virtual Machine.
- OS: Windows Server 2012 R2 x64
- Elasticsearch version: 1.7.3
- Logstash version: 1.5.5
- 2 Virtual CPUs @ 2.59 GHz
- 2GB RAM
- 40GB SSD Drive (plenty of space)
- Java version: 1.8.0_65 x64
Data is being read from same private network.
What could be potential solution in order to fix indexing?
This is output using --debug
: http://pastebin.com/Z2RWt3pj