I'm building a data pipeline using Kafka. Data flow is as follows: capture data change in mongodb and have it sent to elasticsearch.
MongoDB
- version 3.6
- shard cluster
Kafka
- Confuent Platform 4.1.0
- mongoDB source connector : debezium 0.7.5
- elasticserach sink connector
Elasticsearch
- version 6.1.0
Since I'm still testing, Kafka-related systems are running on single server.
start zookeepr
$ bin/zookeeper-server-start etc/kafka/zookeeper.properties
start bootstrap server
$ bin/kafka-server-start etc/kafka/server.properties
start registry schema
$ bin/schema-registry-start etc/schema-registry/schema-registry.properties
start mongodb source connetor
$ bin/connect-standalone \ etc/schema-registry/connect-avro-standalone.properties \ etc/kafka/connect-mongo-source.properties $ cat etc/kafka/connect-mongo-source.properties >>> name=mongodb-source-connector connector.class=io.debezium.connector.mongodb.MongoDbConnector mongodb.hosts='' initial.sync.max.threads=1 tasks.max=1 mongodb.name=higee $ cat etc/schema-registry/connect-avro-standalone.properties >>> bootstrap.servers=localhost:9092 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false rest.port=8083
start elasticsearch sink connector
$ bin/connect-standalone \ etc/schema-registry/connect-avro-standalone2.properties \ etc/kafka-connect-elasticsearch/elasticsearch.properties $ cat etc/kafka-connect-elasticsearch/elasticsearch.properties >>> name=elasticsearch-sink connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector tasks.max=1 topics=higee.higee.higee key.ignore=true connection.url='' type.name=kafka-connect $ cat etc/schema-registry/connect-avro-standalone2.properties >>> bootstrap.servers=localhost:9092 key.converter=io.confluent.connect.avro.AvroConverter key.converter.schema.registry.url=http://localhost:8081 value.converter=io.confluent.connect.avro.AvroConverter value.converter.schema.registry.url=http://localhost:8081 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.\ JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false rest.port=8084
Everything is fine with above system. Kafka connector captures data changes (CDC) and successfully sends it to elasticsearch via sink connector. The problem is that I cannot convert string-type-messaged data into structured data type. For instance, let's consume topic-data after making some changes to mongodb.
$ bin/kafka-avro-console-consumer \
--bootstrap-server localhost:9092 \
--topic higee.higee.higee --from-beginning | jq
Then, I get following result.
"after": null,
"patch": {
"string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
},
"source": {
"version": {
"string": "0.7.5"
},
"name": "higee",
"rs": "172.31.50.13",
"ns": "higee",
"sec": 1524214412,
"ord": 1,
"h": {
"long": -2379508538412995600
},
"initsync": {
"boolean": false
}
},
"op": {
"string": "u"
},
"ts_ms": {
"long": 1524214412159
}
}
Then, if I go to elasticsearch, I get following result.
{
"_index": "higee.higee.higee",
"_type": "kafka-connect",
"_id": "higee.higee.higee+0+3",
"_score": 1,
"_source": {
"after": null,
"patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"},
"name" : "higee",
"salary" : 100,
"origin" : "South Korea"}""",
"source": {
"version": "0.7.5",
"name": "higee",
"rs": "172.31.50.13",
"ns": "higee",
"sec": 1524214412,
"ord": 1,
"h": -2379508538412995600,
"initsync": false
},
"op": "u",
"ts_ms": 1524214412159
}
}
One that I want to achieve is something as follows
{
"_index": "higee.higee.higee",
"_type": "kafka-connect",
"_id": "higee.higee.higee+0+3",
"_score": 1,
"_source": {
"oid" : "5ad97f982a0f383bb638ecac",
"name" : "higee",
"salary" : 100,
"origin" : "South Korea"
}"
}
Some of the options I've been trying and still considering is as follows.
logstash
case 1 : don't know how to parse those characters (/u0002, /u0001)
logstash.conf
input { kafka { bootstrap_servers => ["localhost:9092"] topics => ["higee.higee.higee"] auto_offset_reset => "earliest" codec => json { charset => "UTF-8" } } } filter { json { source => "message" } } output { stdout { codec => rubydebug } }
result
{ "message" => "H\u0002�\u0001{\"_id\" : \ {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \ \"name\" : \"higee\", \ \"salary\" : 101, \ \"origin\" : \"South Korea\"} \ \u0002\n0.7.5\nhigee \ \u0018172.31.50.13\u001Ahigee.higee2 \ ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X", "tags" => [[0] "_jsonparsefailure"] }
case 2
logstash.conf
input { kafka { bootstrap_servers => ["localhost:9092"] topics => ["higee.higee.higee"] auto_offset_reset => "earliest" codec => avro { schema_uri => "./test.avsc" } } } filter { json { source => "message" } } output { stdout { codec => rubydebug } }
test.avsc
{ "namespace": "example", "type": "record", "name": "Higee", "fields": [ {"name": "_id", "type": "string"}, {"name": "name", "type": "string"}, {"name": "salary", "type": "int"}, {"name": "origin", "type": "string"} ] }
result
An unexpected error occurred! {:error=>#<NoMethodError: undefined method `type_sym' for nil:NilClass>, :backtrace=> ["/home/ec2-user/logstash- 6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:384:in `block in read_record'", "org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in thread_runner'", "/home/ec2-user/logstash- 6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in thread_runner'"]}
python client
- consumes topic and produce with different topic name after some data manipulation so that elasticsearch sink connector could just consume well-formatted message from python-manipulated topic
kafka
library : wasn't able to decode messagefrom kafka import KafkaConsumer consumer = KafkaConsumer( topics='higee.higee.higee', auto_offset_reset='earliest' ) for message in consumer: message.value.decode('utf-8') >>> 'utf-8' codec can't decode byte 0xe4 in position 6: invalid continuation byte
confluent_kafka
wasn't compatible with python 3
Any idea how I can jsonify data in elasticsearch? Following are sources I searched.
- mongodb debezium
- mongodb event flattening
- avro converter
- serializing debizium events
- debizum tutorial
Thanks in advance.
Some attempts
1) I've changed my connect-mongo-source.properties file as follows to test transformation.
$ cat etc/kafka/connect-mongo-source.properties
>>>
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope
And following is error log I got. Not yet being comfortable with Kafka and more importantly debezium platform, I wasn't able to debug this error.
ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
at org.bson.BsonDocument.parse(BsonDocument.java:62)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2) In this time, I've changed elasticsearch.properties and didn't make a change to connect-mongo-source.properties.
$ cat connect-mongo-source.properties
name=mongodb-source-connector
connector.class=io.debezium.connector.mongodb.MongoDbConnector
mongodb.hosts=''
initial.sync.max.threads=1
tasks.max=1
mongodb.name=higee
$ cat elasticsearch.properties
name=elasticsearch-sink
connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=higee.higee.higee
key.ignore=true
connection.url=''
type.name=kafka-connect
transforms=unwrap
transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope
And I got following error.
ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
3) changed test.avsc and ran logstash. I didn't get any error message but the outcome wasn't something I was expecting in that origin
, salary
, name
fields were all empty even though they were given non-null values. I was even able to read data through console-consumer properly.
$ cat test.avsc
>>>
{
"type" : "record",
"name" : "MongoEvent",
"namespace" : "higee.higee",
"fields" : [ {
"name" : "_id",
"type" : {
"type" : "record",
"name" : "HigeeEvent",
"fields" : [ {
"name" : "$oid",
"type" : "string"
}, {
"name" : "salary",
"type" : "long"
}, {
"name" : "origin",
"type" : "string"
}, {
"name" : "name",
"type" : "string"
} ]
}
} ]
}
$ cat logstash3.conf
>>>
input {
kafka {
bootstrap_servers => ["localhost:9092"]
topics => ["higee.higee.higee"]
auto_offset_reset => "earliest"
codec => avro {
schema_uri => "./test.avsc"
}
}
}
output {
stdout {
codec => rubydebug
}
}
$ bin/logstash -f logstash3.conf
>>>
{
"@version" => "1",
"_id" => {
"salary" => 0,
"origin" => "",
"$oid" => "",
"name" => ""
},
"@timestamp" => 2018-04-25T09:39:07.962Z
}