5

I wrote a bash script that finds CSV files in specified folders and pipes them into logstash with the correct config file. However when running this script I run into the following error, saying that the shutdown process is stalled, causing an infinite loop until I manually stop it with ctrl+c:

[2018-03-22T08:59:53,833][INFO ][logstash.runner          ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-03-22T08:59:54,211][INFO ][logstash.agent           ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-22T08:59:57,970][INFO ][logstash.pipeline        ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-22T08:59:58,116][INFO ][logstash.pipeline        ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0xf6851b3 run>"}
[2018-03-22T08:59:58,246][INFO ][logstash.agent           ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-03-22T08:59:58,976][INFO ][logstash.outputs.file    ] Opening file {:path=>"/home/kevin/otrs_customer_user"}
[2018-03-22T09:00:06,471][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}
[2018-03-22T09:00:06,484][ERROR][logstash.shutdownwatcher ] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2018-03-22T09:00:11,438][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}

When I run the same file and the same config manually with bash logstash -f xyz.config < myfile.config it works as desired and the process gets properly terminated. In the bash script I'm basically using the exact command and I run into the error above.

I also noticed that the problem appears to be random and not every time on the same file and config.

My config consists of a stdin input a csv filter and for testing an output in json format to a file (also removed stdout{}).

Does anybody have an idea why my process stalls during script execution? Or if not, is there maybe a way to tell logstash to shutdown when it's stalled?

Sample config:

input {
  stdin {
    id => "${LS_FILE}"
  }

}
filter {
    mutate {
        add_field => { "foo_type" => "${FOO_TYPE}" }
        add_field => { "[@metadata][LS_FILE]" => "${LS_FILE}"}
    }
    if [@metadata][LS_FILE] == "contacts.csv" {
      csv {
          separator => ";"
          columns =>
          [
            "IOT",
            "OID",
            "SUM",
            "XID",
            "kundenid"              
          ]
      }
      if [kundenid]{
        mutate {
            update => { "kundenid" => "n-%{kundenid}" }
        }
        }
    }
}
output {   
       if [@metadata][LS_FILE] == "contacts.csv" {
           file{
                path => "~/contacts_file"
                codec => json_lines
            }
       }
}

Sample script:

LOGSTASH="/customer/app/logstash-6.2.3/bin/logstash"

    for file in $(find $TARGETPATH -name *.csv) # Loop each file in given path
    do
        if [[ $file = *"foo"* ]]; then
            echo "Importing $file"
            export LS_FILE=$(basename $file)
            bash $LOGSTASH -f $CFG_FILE < $file  # Starting logstash
            echo "file $file imported."
        fi
    done

I export environment variables in the bash script and set them to metadata in the logstash configs to perform some conditinals for differet input files. The output to JSON in a file is just for testing purposes.

Developer Guy
  • 2,318
  • 6
  • 19
  • 37
KevKosDev
  • 811
  • 2
  • 9
  • 31
  • 1
    Would you please add the actual script and config to the question? The log references "busy or blocked plugins" - what plugins are you loading? – cxw Mar 22 '18 at 12:12
  • Just added an abstract of my script and configs. The weird thing is that the problem seems to occur radomly on different csv files. I can not reproduce the problem by invoking the config manually. – KevKosDev Mar 22 '18 at 12:24
  • 1
    How is `LOGSTASH` set? – codeforester Mar 22 '18 at 12:44
  • I added the declaration. Basically just the full path to logstash's bash file. – KevKosDev Mar 22 '18 at 12:46
  • 1
    Thoughts: **(1)** If any of your filenames have whitespace, insufficient quoting will bite you - use `export LS_FILE="$(basename "$file")"` and `bash "$LOGSTASH" -f "$CFG_FILE" < "$file"`. **(2)** Similar - read filenames with `IFS=... < <(find...)`, e.g., [here](https://stackoverflow.com/a/21663203/2877364). **(3)** There is a race condition between the `find` and the `bash ... < $file`, which might be giving you the inconsistent behaviour. – cxw Mar 22 '18 at 14:10
  • Just corrected all your hints but the problem still appears randomly... I also changed the pipe to `cat "$filename" | bash "$LOGSTASH" -f "$config"` but this did not help either. – KevKosDev Mar 22 '18 at 14:36
  • I also put all filenames into an array and then iterate over that to eliminate the possibilitiy of a race condition you mentioned in point (3). – KevKosDev Mar 22 '18 at 15:10
  • 1
    The array sounds like a good approach. Unfortunately, the race still exists - files could technically be moved or deleted between `find` and `bash < "$file"`. I am afraid I am not experienced enough with logstash to have any deep suggestions. I do note that the warning message lists a lot more columns than the `csv{}` block in the config file does. Also, is your logstash new enough to include [this](https://github.com/logstash-plugins/logstash-output-http/pull/64)? Good luck! – cxw Mar 22 '18 at 15:19
  • Removing of files can be left out for the moment. I'm working on fixed testing files. Logstash version is the latest available so the fix should already be included. Regarding the columns, that's because the config above is just a cut down sample but that shouldn't be the problem. It's atleast good to know that there's no obvious mistake in the bash execution. Thank you for your help! – KevKosDev Mar 22 '18 at 15:27
  • It's possible my error is caused by a recent logstash bug: https://github.com/elastic/logstash/pull/9285 – KevKosDev Mar 27 '18 at 12:02

1 Answers1

1

Logstash tries to performs various steps when you try to shutdown such as,

  • It stop all input, filter and output plugins
  • Process all in-flight events
  • Terminate the Logstash process

and there are various factors which makes the shutdown process very unpredictable such as,

  • An input plugin receiving data at a slow pace.
  • A slow filter, like a Ruby filter executing sleep(10000) or an Elasticsearch filter that is executing a very heavy query.
  • A disconnected output plugin that is waiting to reconnect to flush in-flight events.

From Logstash documentation,

Logstash has a stall detection mechanism that analyzes the behavior of the pipeline and plugins during shutdown. This mechanism produces periodic information about the count of inflight events in internal queues and a list of busy worker threads.

You can use --pipeline.unsafe_shutdown flag while starting logstash to force terminate the process in case of stalled shutdown. When --pipeline.unsafe_shutdown isn’t enabled, Logstash continues to run and produce these reports periodically, this is why the problem appears to be random in your case.

Remember that Unsafe shutdowns, force-kills of the Logstash process, or crashes of the Logstash process for any other reason may result in data loss (unless you’ve enabled Logstash to use persistent queues).

Sufiyan Ghori
  • 18,164
  • 14
  • 82
  • 110