1

I've been trying to do this all day. I want to send logs from Docker to FluentD via the fluentd logging engine and then from fluentd send those logs to logstash for processing.

I keep getting this error from logstash though:

{:timestamp=>"2016-03-09T23:29:19.388000+0000",
 :message=>"An error occurred. Closing connection",
 :client=>"172.18.0.1:57259", :exception=>#<TypeError: can't convert String into Integer>,
 :backtrace=>["org/jruby/RubyTime.java:1073:in `at'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-core-event-2.2.2-java/lib/logstash/timestamp.rb:27:in `at'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-2.0.2-java/lib/logstash/codecs/fluent.rb:41:in `decode'", 
"org/msgpack/jruby/MessagePackLibrary.java:195:in `each'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-codec-fluent-2.0.2-java/lib/logstash/codecs/fluent.rb:40:in `decode'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-3.0.2/lib/logstash/inputs/tcp.rb:153:in `handle_socket'", 
"/opt/logstash/vendor/bundle/jruby/1.9/gems/logstash-input-tcp-3.0.2/lib/logstash/inputs/tcp.rb:143:in `server_connection_thread'"], :level=>:error}

fairly basic logstash config:

input {
  tcp {
    port => 4000
    codec => "fluent"
  }
}

output {
  stdout {
  }
}

fairly basic fluentd config:

<source>
  @type forward
</source>


<match docker.json>
  @type forward
  send_timeout 60s 
  recover_wait 10s 
  heartbeat_type none
  phi_threshold 16
  hard_timeout 60s 

  <server>
    name logstash
    host 172.18.0.2
    port 4000
    weight 60
  </server>
</match>

<match docker.**>
  @type stdout
</match>

One would think this would work, but I've already found that Logstash won't:

  1. Work with fluentd's forward_out heartbeat configuration.
    • Logstash doesn't open a UDP port on the same port as the TCP.
  2. The above error.

The above configuration does work if I craft Fluentd message pack messages in Ruby and send them manually.The key though is I want Fluentd to manage the logs locally and send them to an external logstash server to process the messages correctly into JSON.

Breedly
  • 12,838
  • 13
  • 59
  • 83
  • What kind of processing do you do in Logstash? While I can't guarantee it, Fluentd might be able to replace Logstash, depending on the use case. – Kiyoto Tamura Mar 10 '16 at 23:08
  • I'm using the fluentd log driver for docker and the problem I'm having is it isn't parsing my log messages as JSON even though they are. It creates an escaped string representation. – Breedly Mar 12 '16 at 02:18

3 Answers3

15

We found a way to make fluent -> logstash work. Set time_as_integer true. A minimal configuration on the fluentd side would be

<source>
  @type http
  @id input_http
  port 8888
</source>

<match **>
  @type forward
  time_as_integer true
  <server>
    host localhost
    port 24114
  </server>
</match>

It's mentioned quite hidden in https://docs.fluentd.org/v0.12/articles/in_forward#i-got-messagepackunknownexttypeerror-error-why . On the logstash side, use a recent release (6.2.4), then simply configure the fluent codec, tcp input like this:

input {
  tcp {
    codec => fluent
    port => 24114
  }
}

filter {
}

output {
  stdout { codec => rubydebug }
}

test with

curl -X POST -d 'json={"json":"message"}' http://localhost:8888/debug.test

as in the documentation. With the time_as_integer setting, the logstash output will look nice, like.

{
          "port" => 32844,
      "@version" => "1",
          "host" => "localhost",
          "json" => "message",
    "@timestamp" => 2018-04-26T15:14:28.000Z,
          "tags" => [
        [0] "debug.test"
    ]
}

Without it, I get

[2018-04-26T15:16:00,115][ERROR][logstash.codecs.fluent   ] Fluent parse error, original data now in message field {:error=>#<MessagePack::UnknownExtTypeError: unexpected extension type>, :data=>["fluent.info", "\x92\xD7\u0000Z\xE1\xEC\xF4\u0006$\x96傦worker\u0000\xA7message\xD9&fluentd worker is now running worker=0", {"size"=>1, "compressed"=>"text"}]}
{
          "port" => 32972,
      "@version" => "1",
       "message" => [
        [0] "fluent.info",
        [1] "\x92\xD7\u0000Z\xE1\xEC\xF4\u0006$\x96傦worker\u0000\xA7message\xD9&fluentd worker is now running worker=0",
        [2] {
                  "size" => 1,
            "compressed" => "text"
        }
    ],
          "host" => "localhost",
    "@timestamp" => 2018-04-26T15:16:00.116Z,
          "tags" => [
        [0] "_fluentparsefailure"
    ]
}
ynux
  • 1,280
  • 14
  • 21
  • 1
    After a day of googling and many failures, your hint about the timestamp as an integer got my fluentd successfuly sending data to a logstash tcp input with the fluentd codec. Great find! – Todd Lyons Dec 27 '18 at 00:02
1

AFAIK, there's no way to transport data from Fluentd to Logstash. We need to write any Fluentd output plugins to send data to Logstash, or to write any Logstash input plugins to receive data from Fluentd.

FYI: there are some plugins for direction of Logstash -> Fluentd:

  • fluent-plugin-beats (fluentd input plugin for Elastic beats protocol)
  • logstash-output-fluentd (logstash output plugin to send data to Fluentd)
1

You can forward it directly to logstash tcp input.

This open-source flunetd output plugin will send the data directly to logstash tcp input (or any other receiver) in json format (also supports ssl/tls).

seen first at this question.

dorony
  • 1,008
  • 1
  • 14
  • 31