1

I am using Logstash together with logstash-input-jdbc in order to ship data to Elasticsearch. I'm running it as a service on Windows machine using NSSM. This is config file (I've cut some repetitive code from filters):

input
{
  jdbc
  {
    jdbc_driver_library => "C:\Logstash\lib\sqljdbc42.jar"
    jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
    jdbc_connection_string => "jdbc:sqlserver://myServer;databaseName=myDatabase;user=myUser;password=myPassword;"
    jdbc_user => "myUser"
    jdbc_password => "myPassword"
    statement => "EXECUTE dbo.MyLogstashProcedure;"
    schedule => "* * * * *"
  }
}
filter
{
  ruby {
    code => "
      hash = event.to_hash
        hash.each do |k,v|
          if v == nil
            event.remove(k)
        end
      end"
  }

  #Format RelatedData XMLs
  xml
  {
    source => "relatedproducts"
    target => "parsed_relatedproducts"
  }
  xml
  {
    source => "relatedcountries"
    target => "parsed_relatedcountries"
  }
  xml
  {
    source => "reportpermissions"
    target => "parsed_reportpermissions"
  }

  # Format RelatedProducts XML into objects
  if [relatedproducts]
  {
    ruby
    {
      code => "
        event['parsed_relatedproducts']['RelatedProducts'].each do |x|
          x.each do |key, value|
            if value.count == 1
              x[key] = value[0]
            end
        end
      end"
    }
  }

  # Format RelatedCountries XML into objects
  if [relatedcountries]
  {
    ruby
    {
      code => "
        event['parsed_relatedcountries']['RelatedCountries'].each do |x|
          x.each do |key, value|
            if value.count == 1
              x[key] = value[0]
            end
        end
      end"
    }
  }

  # Rename fields back to their real names. JDBC Driver auto-lowercases them and Elasticsearch is case-sensitive
  mutate
  {
    rename =>
    {
      "itemsourceid" => "ItemSourceID"
      "itemtypeid" => "ItemTypeID"
      "title" => "Title"
      "datepublished" => "DatePublished"
      "[parsed_relatedproducts][RelatedProducts]" => "[RelatedProducts]"
      "[parsed_relatedcountries][RelatedCountries]" => "[RelatedCountries]"
      "[parsed_reportpermissions][ReportPermissions]" => "[ReportPermissions]"
    }

    remove_field =>
      [
        "@version"
        , "@timestamp"
        , "relatedproducts"
        , "relatedcountries"
        , "reportpermissions"
        , "parsed_relatedproducts"
        , "parsed_relatedcountries"
        , "parsed_reportpermissions"
      ]
  }
}
output {
  #stdout { codec => rubydebug }
  elasticsearch
  {
    host => "localhost"
    protocol => "http"
    action => "index"
    index => "myIndex"
    document_type => "myType"
    document_id => "%{documentId}"
  }  
}

So as per my understanding:

  • Every minute Logstash will execute EXECUTE dbo.MyLogstashProcedure;
  • It will do the following ETL:
    1. Remove fields, which are nil
    2. Parse XML columns into sub-documents
    3. Rename fields back to their real names (JDBC driver lowercase column names)
  • It will index data to Elasticsearch cluster

ETL'ed document looks like this. Also, sometimes ReportsPermissions array can be huge and store up 20 thousands of records.

{
  "RelatedCountries": [
    {
      "CountryCode": "MX",
      "CountryName": "Mexico",
      "CountryPermissions": ["4", "122", "11"]
    }
  ],
  "RelatedProducts": [
    {
      "ProductID": "1",
      "ProductName": "Packaged Food"
    },
    {
      "ProductID": "2",
      "ProductName": "Packaged Food",
      "ProductPermissions": ["19", "29", "30", "469"]
    }
  ],
  "Title": "Packaged Food in Mexico",
  "ItemTypeID": "3",
  "ItemSourceID": "2",
  "DatePublished": "2014-11-27T00:00:00",
  "ReportPermissions": ["p19c4", "p19c11", "p19c122", "p29c4", "p29c11", "p29c122", "p30c4", "p30c11", "p30c122", "p281c4", "p281c11", "p281c122", "p285c4", "p285c11", "p285c122", "p286c4", "p286c11", "p286c122", "p292c4", "p292c11", "p292c122", "p294c4", "p294c11", "p294c122", "p295c4", "p295c11", "p295c122", "p297c4", "p297c11", "p297c122", "p298c4", "p298c11", "p298c122", "p299c4", "p299c11", "p299c122", "p469c11", "p515c4", "p515c11", "p515c122", "p516c4", "p516c11", "p516c122", "p517c4", "p517c11", "p517c122", "p518c4", "p518c11", "p518c122", "p519c4", "p519c11", "p519c122", "p520c4", "p520c11", "p520c122", "p521c4", "p521c11", "p521c122"]
}

My Stored Procedure execution time strongly depends on data. Sometimes it might execute in 10-15 seconds, sometimes in around 40.

Data is indexed fine (every minute) if Stored Procedure brings results back in around 10-15 seconds. Now when it takes longer to execute, I notice that Logstash slows down and starts shipping data every couple of minutes or even completely stops doing that.

Nor stdout neither stderr show any errors. There's nothing in Elasticsearch logs as well.

This runs on Rackspace Virtual Machine.

  • OS: Windows Server 2012 R2 x64
  • Elasticsearch version: 1.7.3
  • Logstash version: 1.5.5
  • 2 Virtual CPUs @ 2.59 GHz
  • 2GB RAM
  • 40GB SSD Drive (plenty of space)
  • Java version: 1.8.0_65 x64

Data is being read from same private network.

What could be potential solution in order to fix indexing?

This is output using --debug: http://pastebin.com/Z2RWt3pj

Community
  • 1
  • 1
Evaldas Buinauskas
  • 13,739
  • 11
  • 55
  • 107
  • Can you try to run logstash with the `--debug` option so we can maybe get some hints from the more verbose output? Also why don't you simply increase the cron to every two minutes instead of every minute since you know one minute can be too short for your stored proc to execute? – Val Jan 25 '16 at 14:06
  • @Val I've added information from `--debug`, I hope I did it right. And of course, I could configure cron to execute it every two minutes, but then if my SP returns data quite quick, I'll end up slowing down overall indexing, that's pretty much it – Evaldas Buinauskas Jan 25 '16 at 14:23

0 Answers0