0

I am following this github repo

https://github.com/hannesstockner/kafka-connect-elasticsearch/

and I am trying to read data from file source into elastic search

I am getting an error when i run standalone.sh script

Failed to flush WorkerSourceTask{id=local-file-source-0}, timed out while waiting for producer to flush outstanding messages, 1 left ({ProducerRecord(topic=recipes, partition=null, key=null, value=[B@6704e57f=ProducerRecord(topic=recipes, partition=null, key=null, value=[B@6704e57f})

And these are my config:

connect-elasticsearch-sink.properties

name=local-elasticsearch-sink
connector.class=com.hannesstockner.connect.es.ElasticsearchSinkConnector
tasks.max=1
es.host=10.200.10.1
topics=recipes
index.prefix=kafka_

connect-file-source.properties

name=local-elasticsearch-sink
connector.class=com.hannesstockner.connect.es.ElasticsearchSinkConnector
tasks.max=1
es.host=10.200.10.1
topics=recipes
index.prefix=kafka_

connect-standalone.properties

bootstrap.servers=10.200.10.1:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false

internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
#offset.flush.interval.ms=20000
offset.flush.timeout.ms=20000

and docker config:

kafka:
  image: flozano/kafka:0.9.0.0
  ports:
    - "2181:2181"
    - "9092:9092"
  environment:
    ADVERTISED_HOST: ${DOCKER_IP}
elasticsearch:
  image: elasticsearch:2.1
  ports:
    - "9200:9200"
    - "9300:9300"

I tried to set offset.flush.timeout.ms=20000 and producer.buffer.memory=10 in my standlone.properties file following thread but no luck:

Kafka Connect - Failed to flush, timed out while waiting for producer to flush outstanding messages

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
Suhas Kashyap
  • 398
  • 3
  • 14
  • That repo has not been updated in 6 years. What exactly does it do differently from the "real" [kafka-connect-elasticsearch](https://github.com/confluentinc/kafka-connect-elasticsearch) that Elastic actually supports? Other than that, FileSourceConnector isn't really meant to be used. – OneCricketeer Jan 17 '22 at 04:44

1 Answers1

0

If you want to read files into Elastic (or Kafka), it'd be preferred you use Filebeat.

The FileSourceConnector is documented as being an example, not a production-level product. Meanwhile, there are other connectors like "Spooldir connector" or "kafka-connect-fs" project

Further, the actual Elasticsearch Kafka Connector that is supported and actively developed is here

Plus, you should use a different Kafka Docker image that is also maintained and up to date (such as those from bitnami or confluentinc), which you can use with a Docker Kafka Connect image such as mine instead of reading local files.

Your Elasticsearch docker image version is also 6+ years old.

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245