I wrote a bash script that finds CSV files in specified folders and pipes them into logstash with the correct config file. However when running this script I run into the following error, saying that the shutdown process is stalled, causing an infinite loop until I manually stop it with ctrl+c:
[2018-03-22T08:59:53,833][INFO ][logstash.runner ] Starting Logstash {"logstash.version"=>"6.2.3"}
[2018-03-22T08:59:54,211][INFO ][logstash.agent ] Successfully started Logstash API endpoint {:port=>9600}
[2018-03-22T08:59:57,970][INFO ][logstash.pipeline ] Starting pipeline {:pipeline_id=>"main", "pipeline.workers"=>2, "pipeline.batch.size"=>125, "pipeline.batch.delay"=>50}
[2018-03-22T08:59:58,116][INFO ][logstash.pipeline ] Pipeline started succesfully {:pipeline_id=>"main", :thread=>"#<Thread:0xf6851b3 run>"}
[2018-03-22T08:59:58,246][INFO ][logstash.agent ] Pipelines running {:count=>1, :pipelines=>["main"]}
[2018-03-22T08:59:58,976][INFO ][logstash.outputs.file ] Opening file {:path=>"/home/kevin/otrs_customer_user"}
[2018-03-22T09:00:06,471][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}
[2018-03-22T09:00:06,484][ERROR][logstash.shutdownwatcher ] The shutdown process appears to be stalled due to busy or blocked plugins. Check the logs for more information.
[2018-03-22T09:00:11,438][WARN ][logstash.shutdownwatcher ] {"inflight_count"=>0, "stalling_thread_info"=>{["LogStash::Filters::CSV", {"separator"=>";", "columns"=>["IOT", "OID", "SUM", "XID", "change_by", "change_time", "city", "company", "company2", "create_by", "create_time", "customer_id", "email", "fax", "first_name", "id", "inst_city", "inst_first_name", "inst_last_name", "inst_street", "inst_zip", "last_name", "login", "mobile", "phone", "phone2", "street", "title", "valid_id", "varioCustomerId", "zip"], "id"=>"f1c74146d6672ca71f489aac1b4c2a332ae515996657981e1ef44b441a7420c8"}]=>[{"thread_id"=>23, "name"=>nil, "current_call"=>"[...]/logstash-core/lib/logstash/util/wrapped_synchronous_queue.rb:90:in `read_batch'"}]}}
When I run the same file and the same config manually with bash logstash -f xyz.config < myfile.config
it works as desired and the process gets properly terminated. In the bash script I'm basically using the exact command and I run into the error above.
I also noticed that the problem appears to be random and not every time on the same file and config.
My config consists of a stdin input a csv filter and for testing an output in json format to a file (also removed stdout{}
).
Does anybody have an idea why my process stalls during script execution? Or if not, is there maybe a way to tell logstash to shutdown when it's stalled?
Sample config:
input {
stdin {
id => "${LS_FILE}"
}
}
filter {
mutate {
add_field => { "foo_type" => "${FOO_TYPE}" }
add_field => { "[@metadata][LS_FILE]" => "${LS_FILE}"}
}
if [@metadata][LS_FILE] == "contacts.csv" {
csv {
separator => ";"
columns =>
[
"IOT",
"OID",
"SUM",
"XID",
"kundenid"
]
}
if [kundenid]{
mutate {
update => { "kundenid" => "n-%{kundenid}" }
}
}
}
}
output {
if [@metadata][LS_FILE] == "contacts.csv" {
file{
path => "~/contacts_file"
codec => json_lines
}
}
}
Sample script:
LOGSTASH="/customer/app/logstash-6.2.3/bin/logstash"
for file in $(find $TARGETPATH -name *.csv) # Loop each file in given path
do
if [[ $file = *"foo"* ]]; then
echo "Importing $file"
export LS_FILE=$(basename $file)
bash $LOGSTASH -f $CFG_FILE < $file # Starting logstash
echo "file $file imported."
fi
done
I export environment variables in the bash script and set them to metadata in the logstash configs to perform some conditinals for differet input files. The output to JSON in a file is just for testing purposes.