0

In few words I have this stack: FileBeat reads certain file log and push on Kafka topic. LogStash reads from such Kafka topic and insert into ElasticSearch. To sumarize, let's say "file logs -> FileBeat -> Kafka Topic -> LogStash -> ElasticSearch".

My Docker compose:

version: '3.2'
services:
  kibana:
    image: docker.elastic.co/kibana/kibana:7.5.2
    volumes:
      - "./kibana.yml:/usr/share/kibana/config/kibana.yml"
    restart: always
    environment:
    - SERVER_NAME=kibana.localhost
    - ELASTICSEARCH_HOSTS=http://elasticsearch:9200
    ports:
      - "5601:5601"
    links:
      - elasticsearch
    depends_on:
      - elasticsearch
  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:7.5.2
    environment:
      - cluster.name=docker-cluster
      - bootstrap.memory_lock=true
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.security.enabled=false
      - xpack.watcher.enabled=false
      - discovery.type=single-node
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - "./esdata:/usr/share/elasticsearch/data"
    ports:
      - "9200:9200"
  logstash:
    image: docker.elastic.co/logstash/logstash:7.5.2
    volumes:
      - "./logstash.conf:/config-dir/logstash.conf"
    restart: always
    command: logstash -f /config-dir/logstash.conf
    ports:
      - "9600:9600"
      - "7777:7777"
    links:
      - elasticsearch
      - kafka1
      - kafka2
      - kafka3
  kafka1:
    image: wurstmeister/kafka
    command: [start-kafka.sh]
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    links:
      - zoo1
      - zoo2
      - zoo3
    ports:
      - "9092:9092"
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka1:9092
      KAFKA_BROKER_ID: 1
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: "168"
      KAFKA_LOG_RETENTION_BYTES: "100000000"
      KAFKA_ZOOKEEPER_CONNECT:  zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_CREATE_TOPICS: "log:3:3"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
  kafka2:
    image: wurstmeister/kafka
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    links:
      - zoo1
      - zoo2
      - zoo3
    ports:
      - "9093:9092"
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka2:9092
      KAFKA_BROKER_ID: 2
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: "168"
      KAFKA_LOG_RETENTION_BYTES: "100000000"
      KAFKA_ZOOKEEPER_CONNECT:  zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_CREATE_TOPICS: "log:3:3"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
  kafka3:
    image: wurstmeister/kafka
    depends_on:
      - zoo1
      - zoo2
      - zoo3
    links:
      - zoo1
      - zoo2
      - zoo3
    ports:
      - "9094:9092"
    environment:
      KAFKA_LISTENERS: PLAINTEXT://:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka3:9092
      KAFKA_BROKER_ID: 3
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_LOG_RETENTION_HOURS: "168"
      KAFKA_LOG_RETENTION_BYTES: "100000000"
      KAFKA_ZOOKEEPER_CONNECT:  zoo1:2181,zoo2:2181,zoo3:2181
      KAFKA_CREATE_TOPICS: "log:3:3"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: 'true'
  zoo1:
    image: elevy/zookeeper:latest
    environment:
      MYID: 1
      SERVERS: zoo1,zoo2,zoo3
    ports:
      - "2181:2181"
  zoo2:
    image: elevy/zookeeper:latest
    environment:
      MYID: 2
      SERVERS: zoo1,zoo2,zoo3
    ports:
      - "2182:2181"
  zoo3:
    image: elevy/zookeeper:latest
    environment:
      MYID: 3
      SERVERS: zoo1,zoo2,zoo3
    ports:
      - "2183:2181"
  filebeat:
    image: docker.elastic.co/beats/filebeat:7.5.2
    volumes:
      - "./filebeat.yml:/usr/share/filebeat/filebeat.yml:ro"
      - "./sample-logs:/sample-logs"
    links:
      - kafka1
      - kafka2
      - kafka3
    depends_on:
      - kafka1
      - kafka2
      - kafka3

filebeat.yml

filebeat.inputs:

- paths:
    - /sample-logs/request-sample.log
  tags:
    - request-sample
  input_type: log
  document_type: request-sample
  fields_under_root: true

output.kafka:
  hosts: ["kafka1:9092", "kafka2:9092", "kafka3:9092"]
  topic: 'log'
  partition.round_robin:
    reachable_only: false
  required_acks: 1
  compression: gzip
  max_message_bytes: 1000000

sample-logs file expected to be read

2020-01-10 13:33:14,782 INFO {"appName":"xxxx-consultar-transacao-java","component":"br.com.bd.components.logger.RequestFilter","logType":"FUNC","env":"dev","eventTime":"20200110133314717","logSeverity":6,"soc":false,"baseUri":"/xxxx/transactions/v1","resourceURI":"/transactions/retained","resourceAction":"GET","entity":"","statusHttpCode":200,"statusCode":"OK","requestBytes":1150,"responseBytes":0,"responseTime":3754,"params":{"transaction_type":["T"],"status":["A"],"start_date":["20191211"],"end_date":["20200110"],"scheduling":["false"],"offset":["1"],"size":["100"]},"header":{"content-length":"1150","postman-token":"30bccc36-952d-4286-9cc4-2a795193fc5b","host":"bcodmswrk01:8087","connection":"keep-alive","cache-control":"no-cache","accept-encoding":"gzip, deflate","user-agent":"PostmanRuntime/7.21.0","accept":"*/*"},"pathParams":{},"src":"10.100.13.250","solicitationID":"e4af7622-a38a-4502-96c0-c8decf9dae64","headerXForwardedFor":"10.100.13.250"} 

logstash.conf

input {
  kafka {
    bootstrap_servers => "kafka1:9092,kafka2:9092,kafka3:9092"
    client_id => "logstash"
    group_id => "logstash"
    consumer_threads => 3
    topics => ["log"]
    codec => "json"
    tags => ["log", "kafka_source"]
    type => "log"
  }
}

filter {
  if [type] == "request-sample" {
    grok {
      match => { "message" => "%{COMMONAPACHELOG}" }
    }
    date {
      match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"]
      remove_field => ["timestamp"]
    }
  }
}

output {
  elasticsearch {
       hosts => ["elasticsearch:9200"]
       index => "logstash-%{[type]}-%{+YYYY.MM.dd}"
  }
  stdout { codec => rubydebug }
}

Logging in elasticsearch container and searching for some index resulted from my logs I don't see it at all

sh-4.2# curl -XGET 'localhost:9200/_cat/indices'
yellow open twitter                  _YO2OfkZTyml62a0q_2Vag 1 1 0 0   283b   283b
green  open .kibana_task_manager_1   ZPM_sJH8Se6hpqcBNaArxw 1 0 2 1 16.2kb 16.2kb
green  open .apm-agent-configuration ZwUEUvCIQzuEeFkXiGAMFg 1 0 0 0   283b   283b
green  open .kibana_1                dMtrvO2OSW6OwUsF06B4sg 1 0 7 0 25.6kb 25.6kb
sh-4.2#

Looking at each log from each container I can't find any execption neither some info which could give me some tip which part of the stack is failling.

Please, can someone give me some clue if I am missing some extra FileBeat configuration to start the processe or any candidate configuration I could check?

*** edited

logs from kafka on container

[2020-02-06 01:47:32,621] WARN [SocketServer brokerId=1] Unexpected error from /172.26.0.1; closing connection (org.apache.kafka.common.network.Selector)
org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1212498244 larger than 104857600)
    at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:104)
    at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
    at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
    at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:483)
    at kafka.network.Processor.poll(SocketServer.scala:890)
    at kafka.network.Processor.run(SocketServer.scala:789)

*** edited

console

Jim C
  • 3,957
  • 25
  • 85
  • 162
  • 1
    First of all, you can check if the topic is empty: `kafkacat -C -b localhost:9092 -t log -o beginning -f '\nKey (%K bytes): %k\t\nValue (%S bytes): %s\n\Partition: %p\tOffset: %o\n--\n'` – Iskuskov Alexander Feb 05 '20 at 22:07
  • By the way, multiple brokers on one machine will be slower than just one – OneCricketeer Feb 05 '20 at 23:27
  • @IskuskovAlexander I can't use kafkacat neither install it in my image. Any other trick to analize the topic? – Jim C Feb 06 '20 at 15:41
  • 1
    E.g. using console consumer: `kafka-console-consumer --bootstrap-server localhost:9092 --topic my-topic --from-beginning` – Iskuskov Alexander Feb 06 '20 at 15:45
  • 1
    You can use kafkacat from your localhost to connect to your brokers, just install it – Iskuskov Alexander Feb 06 '20 at 15:46
  • Thanks, it seems the topic is empty. I tried: kafka-console-consumer --bootstrap-server localhost:9092 --topic log --from-beginning . Probably it means that FileBeat is not pushing to kafka. Any clue what to check in FileBeat docker-compose.yml or its filebeat.yml. For instance, how/what would you try to see if fielebeat is really "beating" on above log file? – Jim C Feb 06 '20 at 16:02
  • @IskuskovAlexander does this WARN means something I am missing "WARN [SocketServer brokerId=1] Unexpected error from /172.26.0.1; closing connection (org.apache.kafka.common.network.Selector) org.apache.kafka.common.network.InvalidReceiveException: Invalid receive (size = 1212498244 larger than 104857600)" (I pasted the whole log above). Am I missing some configuration that I should made kafka topic larger? – Jim C Feb 06 '20 at 16:05
  • From the image pasted above, you can see that FileBeat mention some harvester task pointing to my log file but I can see any text on my kafka consumer console. Please, give me some suggestions what to check – Jim C Feb 06 '20 at 16:16
  • 1
    Please check this answer stackoverflow.com/a/57141669/7109598 – Iskuskov Alexander Feb 06 '20 at 16:25
  • 1
    Better option is decrease filebeat message size – Iskuskov Alexander Feb 06 '20 at 16:26
  • I decrease the message for 2020-01-2 13:34:14,782 INFO {"appName":"alcd-consultar-transacao-java"}. Just one line in the file and I still see nothing in kafka consumer neither I see some usefull error message – Jim C Feb 06 '20 at 18:49
  • 1
    What if you send messages to Kafka manually? https://kafka.apache.org/quickstart#quickstart_send – Iskuskov Alexander Feb 06 '20 at 19:50
  • 1
    Maybe useful https://stackoverflow.com/questions/49370959/getting-org-apache-kafka-common-network-invalidreceiveexception-invalid-receiv – Ashish Bhosle Feb 07 '20 at 10:39
  • @AshishBhosle from your link suggestion ( https://stackoverflow.com/a/59730358/4148175 ) I read: "... this is probably annoying but harmless...". Do you know if I can ignore such error message? I found many people saying to increase max.message.bytes what I have done but I am still getting sometimes this error even when the message is really small. – Jim C Feb 13 '20 at 13:38
  • According to https://stackoverflow.com/a/59730358/4148175 "This indicates that HTTP traffic is being sent to Kafka port 9092, but Kafka doesn't accept HTTP traffic, it only accepts its own protocol (which takes the first four bytes as the receive size, hence the error)". Does it mean in my case I should set another protocol instead of "PLAINTEXT://kafka1:9092"? – Jim C Feb 13 '20 at 13:40
  • @IskuskovAlexander if I send manually the message I don't get the same error. Does it give you a clue what is wrong in my docker compose? – Jim C Feb 13 '20 at 13:41

0 Answers0