10

I'm building a data pipeline using Kafka. Data flow is as follows: capture data change in mongodb and have it sent to elasticsearch.

enter image description here

MongoDB

  • version 3.6
  • shard cluster

Kafka

  • Confuent Platform 4.1.0
  • mongoDB source connector : debezium 0.7.5
  • elasticserach sink connector

Elasticsearch

  • version 6.1.0

Since I'm still testing, Kafka-related systems are running on single server.

  • start zookeepr

    $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
    
  • start bootstrap server

    $ bin/kafka-server-start etc/kafka/server.properties
    
  • start registry schema

    $ bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
  • start mongodb source connetor

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone.properties \ 
      etc/kafka/connect-mongo-source.properties
    
    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    
    $ cat etc/schema-registry/connect-avro-standalone.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8083
    
  • start elasticsearch sink connector

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone2.properties  \ 
      etc/kafka-connect-elasticsearch/elasticsearch.properties
    
    $ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
    >>>
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    
    $ cat etc/schema-registry/connect-avro-standalone2.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.\ 
                          JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8084
    

Everything is fine with above system. Kafka connector captures data changes (CDC) and successfully sends it to elasticsearch via sink connector. The problem is that I cannot convert string-type-messaged data into structured data type. For instance, let's consume topic-data after making some changes to mongodb.

    $ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq

Then, I get following result.

    "after": null,
      "patch": {
        "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
      },
      "source": {
        "version": {
          "string": "0.7.5"
        },
        "name": "higee",
        "rs": "172.31.50.13",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": {
          "long": -2379508538412995600
        },
        "initsync": {
          "boolean": false
        }
      },
      "op": {
        "string": "u"
      },
      "ts_ms": {
        "long": 1524214412159
      }
    }

Then, if I go to elasticsearch, I get following result.

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "after": null,
          "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"}""",
          "source": {
            "version": "0.7.5",
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          },
          "op": "u",
          "ts_ms": 1524214412159
        }
      }

One that I want to achieve is something as follows

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"
         }"
     }

Some of the options I've been trying and still considering is as follows.

  • logstash

    • case 1 : don't know how to parse those characters (/u0002, /u0001)

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => json {
              charset => "UTF-8"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
         }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
      • result

        {
        "message" => "H\u0002�\u0001{\"_id\" : \
            {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
             \"name\" : \"higee\", \
             \"salary\" : 101, \
             \"origin\" : \"South Korea\"} \
             \u0002\n0.7.5\nhigee \ 
             \u0018172.31.50.13\u001Ahigee.higee2 \ 
             ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
        "tags" => [[0] "_jsonparsefailure"]
        }
        
    • case 2

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => avro {
              schema_uri => "./test.avsc"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
        }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
      • test.avsc

        {
            "namespace": "example",
            "type": "record",
            "name": "Higee",
            "fields": [
              {"name": "_id", "type": "string"},
              {"name": "name", "type": "string"},
              {"name": "salary",  "type": "int"},
              {"name": "origin", "type": "string"}
            ]
         }
        
      • result

        An unexpected error occurred! {:error=>#<NoMethodError: 
        undefined method `type_sym' for nil:NilClass>, :backtrace=> 
        ["/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
        "org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
        logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
        avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
        vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
        thread_runner'", "/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
        thread_runner'"]}
        
  • python client

    • consumes topic and produce with different topic name after some data manipulation so that elasticsearch sink connector could just consume well-formatted message from python-manipulated topic
    • kafka library : wasn't able to decode message

      from kafka import KafkaConsumer
      
      consumer = KafkaConsumer(
                   topics='higee.higee.higee',
                   auto_offset_reset='earliest'
                 )
      
      for message in consumer:
          message.value.decode('utf-8')
      
      >>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
          invalid continuation byte
      
    • confluent_kafka wasn't compatible with python 3


Any idea how I can jsonify data in elasticsearch? Following are sources I searched.

Thanks in advance.


Some attempts

1) I've changed my connect-mongo-source.properties file as follows to test transformation.

    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    transforms=unwrap     
    transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope

And following is error log I got. Not yet being comfortable with Kafka and more importantly debezium platform, I wasn't able to debug this error.

ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
    at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
    at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
    at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2) In this time, I've changed elasticsearch.properties and didn't make a change to connect-mongo-source.properties.

$ cat connect-mongo-source.properties

    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee

$ cat elasticsearch.properties

    name=elasticsearch-sink
    connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    transforms=unwrap
    transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

And I got following error.

ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
    at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
    at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

3) changed test.avsc and ran logstash. I didn't get any error message but the outcome wasn't something I was expecting in that origin, salary, name fields were all empty even though they were given non-null values. I was even able to read data through console-consumer properly.

$ cat test.avsc
>>>
    {
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
        }
      } ]
    }

$ cat logstash3.conf
>>>
    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro {
          schema_uri => "./test.avsc"
        }
      }
    }

    output {
      stdout {
       codec => rubydebug
      }
    }

$ bin/logstash -f logstash3.conf
>>>
    {
    "@version" => "1",
    "_id" => {
      "salary" => 0,
      "origin" => "",
      "$oid" => "",
      "name" => ""
    },
    "@timestamp" => 2018-04-25T09:39:07.962Z
    }
OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
higee
  • 402
  • 1
  • 7
  • 16
  • 1
    I think you need to include the data from Mongo itself in your debugging... The patch field is apparently only a string, not an object... Regarding Logstash, `codec => json {` doesn't work for Avro data, and in your Avro codec example, what are `favorite_number` and `favorite_color` supposed to represent?? `$oid` isn't the same as `_id` – OneCricketeer Apr 21 '18 at 20:27
  • @cricket_007 1) `favorite_number` and `favorite_color` were `salary` and `origin` respectively. I just copy-pasted different parts. I edited those in above question. 2) what do you mean by actual data from mongodb? patch field represents raw data in string format. That's how I understood and they actually are real data. 3) $oid is object id for each document in mongodb and that's given by mongodb by default. Is there a way I could change/remove that field? – higee Apr 22 '18 at 01:21

3 Answers3

2

Python Client

You must use the Avro Consumer, otherwise you will get 'utf-8' codec can't decode byte

Even this example will not work because you still need the schema registry to lookup the schema.

The prerequisites of Confluent's Python Client says it works with Python 3.x

Nothing is stopping you from using a different client, so not sure why you left it at only trying Python.

Logstash Avro Codec

  1. JSON Codec cannot decode Avro data. I don't think the json filter following the avro input codec will work either
  2. Your Avro schema is wrong - You're missing the $oid in place of _id
  3. There is a difference between "raw Avro" (that includes the schema within the message itself), and Confluent's encoded version of it (which only contains the schema ID in the registry). Meaning, Logstash doesn't integrate with the Schema Registry... at least not without a plugin.

Your AVSC should actually look like this

{
  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ {
    "name" : "_id",
    "type" : {
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ {
        "name" : "$oid",
        "type" : "string"
      }, {
        "name" : "salary",
        "type" : "long"
      }, {
        "name" : "origin",
        "type" : "string"
      }, {
        "name" : "name",
        "type" : "string"
      } ]
    }
  } ]
}

However, Avro doesn't allow for names starting with anything but a regex of [A-Za-z_], so that $oid would be a problem.

While I would not recommend it (nor have actually tried it), one possible way to get your JSON-encoded Avro data into Logstash from the Avro console consumer could be use the Pipe input plugin

input {
  pipe {
    codec => json
    command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
  }
}

Debezium

note that the after value is always a string, and that by convention it will contain a JSON representation of the document

http://debezium.io/docs/connectors/mongodb/

I think this also applies to patch values, but I don't know Debezium, really.

Kafka won't parse the JSON in-flight without the use of a Simple Message Transform (SMT). Reading the documentation you linked to, you should probably add these to your Connect Source properties

transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

Also worth pointing out, field flattening is on the roadmap - DBZ-561

Kafka Connect Elasticsearch

Elasticsearch doesn't parse and process encoded JSON string objects without the use of something like Logstash or its JSON Processor. Rather, it only indexes them as a whole string body.

If I recall correctly, Connect will only apply an Elasticsearch mapping to top-level Avro fields, not nested ones.

In other words, the mapping that is generated follows this pattern,

"patch": {
    "string": "...some JSON object string here..."
  },

Where you actually need to be like this - perhaps manually defining your ES index

"patch": {
   "properties": {
      "_id": {
        "properties" {
          "$oid" :  { "type": "text" }, 
          "name" :  { "type": "text" },
          "salary":  { "type": "int"  }, 
          "origin": { "type": "text" }
      },

Again, not sure if the dollar sign is allowed, though.

Kafka Connect MongoDB Source

If none of the above are working, you could attempt a different connector

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • First of all, I appreciate your feedback. Configuring debizium connector with following settings didn't work the way I expected. transforms=unwrap transforms.unwrap.type = io.debezium.connector.mongodb \ transforms.UnwrapFromMongoDbEnvelope And yes, I read confluent doc that they support Python 3. Once importing libraries, however, I got an exception since some of the codes were written in python2 syntax. Following is an example. except NameError, err: – higee Apr 22 '18 at 01:34
  • Instead of manually changing them to python3-compatible code, I just used python2 and it worked. Following is the code – higee Apr 22 '18 at 01:34
  • Please don't post code or long errors into the comments – OneCricketeer Apr 22 '18 at 01:36
  • I see,they are not at all readable.Then howcould I paste error messages or code blocks? – higee Apr 22 '18 at 01:37
  • Edit your post or if you have solved the problem, post your own answer – OneCricketeer Apr 22 '18 at 01:37
  • I did post my answer :D – higee Apr 22 '18 at 02:20
  • What's wrong with SMT solution? Also I recommend to do unwrap on the sink connector side. Please check an example - https://github.com/debezium/debezium-examples/tree/master/unwrap-mongodb-smt Also you can check how the unwrapping works when streaming to Elasticsearch http://debezium.io/blog/2018/01/17/streaming-to-elasticsearch/ so you just need to combine both sources mentioned to source from MongoDB to Elasticsearch – Jiri Pechanec Apr 23 '18 at 06:13
  • @Jiri I think you should comment that on the other answer – OneCricketeer Apr 23 '18 at 06:45
  • @JiriPechanec I missed your comment. I've tried transformation on either side of connectors and I left my logs on my original question. I couldn't paste every trial I've done on this post. Still, however, I'm facing similar errors and still haven't yet figured out how to debug them. – higee Apr 27 '18 at 05:30
  • Biggest obstacle was that I wasn't yet able to understand the syntax and usage of TRANSFORMATIONS. For instance, transformation configuration of sink connector are all different in https://github.com/debezium/debezium-examples/blob/master/unwrap-mongodb-smt/jdbc-sink.json, http://debezium.io/blog/tags/elasticsearch/, and https://www.confluent.io/blog/simplest-useful-kafka-connect-data-pipeline-world-thereabouts-part-3/. I know that those three examples are all different environment. Still, I could find right transformation configuration for my use case. – higee Apr 27 '18 at 05:34
  • @GeeYeolNahm Syntax is `"transforms": ""`, then `"transforms..type"` is set to `some class` that implements a Transformation. https://github.com/debezium/debezium/blob/master/debezium-connector-mongodb/src/main/java/io/debezium/connector/mongodb/transforms/UnwrapFromMongoDbEnvelope.java Then, when you have more than one transformation, they are applied left to right – OneCricketeer Apr 27 '18 at 12:52
2

I was able to solve this issue using python kafka client. Following is new architecture of my pipeline.

enter image description here

I used python 2 even though Confluent document says that python3 is supported. Main reason was that there were some python2-syntax code. For instance...(Not exactly following line but similar syntax)

    except NameError, err:

In order to use with Python3 I need to convert above lines into:

    except NameError as err:

That being said, following is my python code. Note that this code is only for prototyping and not for production yet.

Consume a message via Confluent Consumer

  • code

    from confluent_kafka.avro import AvroConsumer
    
    c = AvroConsumer({ 
           'bootstrap.servers': '',
           'group.id': 'groupid',
           'schema.registry.url': ''
        })
    
    c.subscribe(['higee.higee.higee'])
    
    x = True
    
    while x:
        msg = c.poll(100)
        if msg:
            message = msg.value()
            print(message)
            x = False
    
    c.close()
    
  • (after updating a document in mongodb) let's check message variable

    {u'after': None,
     u'op': u'u',
     u'patch': u'{
         "_id" : {"$oid" : "5adafc0e2a0f383bb63910a6"},
         "name" : "higee",
         "salary" : 100,
         "origin" : "S Korea"}',
     u'source': {
         u'h': 5734791721791032689L,
         u'initsync': False,
         u'name': u'higee',
         u'ns': u'higee.higee',
         u'ord': 1,
         u'rs': u'',
         u'sec': 1524362971,
         u'version': u'0.7.5'},
     u'ts_ms': 1524362971148
     }
    

manipulate message consumed

  • code

    patch = message['patch']
    patch_dict = eval(patch)
    patch_dict.pop('_id')
    
  • check patch_dict

    {'name': 'higee', 'origin': 'S Korea', 'salary': 100}
    

Produce a message via Confluent Producer

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer

    value_schema_str = """
    {
       "namespace": "higee.higee",
       "name": "MongoEvent",
       "type": "record",
       "fields" : [
           {
               "name" : "name",
               "type" : "string"
           },
           {
              "name" : "origin",
              "type" : "string"
           },
           {
               "name" : "salary",
               "type" : "int32"
           }
       ]
    }
    """
    AvroProducerConf = {
        'bootstrap.servers': '',
        'schema.registry.url': ''
    }

    value_schema = avro.load('./user.avsc')
    avroProducer = AvroProducer(
                       AvroProducerConf, 
                       default_value_schema=value_schema
                   )

    avroProducer.produce(topic='python', value=patch_dict)
    avroProducer.flush()

The only thing left is to make elasticsearch sink connector respond to new topic 'python' by setting configuration in following format. Everything remains the same except topics.

    name=elasticsearch-sink
    connector.class= io.confluent.connect \ 
                     elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=python
    key.ignore=true
    connection.url=''
    type.name=kafka-connect

Then run the elasticsearch sink connector and have it checked at elasticsearch.

    {
        "_index": "zzzz",
        "_type": "kafka-connect",
        "_id": "zzzz+0+3",
        "_score": 1,
        "_source": {
          "name": "higee",
          "origin": "S Korea",
          "salary": 100
        }
      }
higee
  • 402
  • 1
  • 7
  • 16
  • Keep in mind: `eval()` should not be used in Python. Importing json module would be safer. Also, I still believe using a Java Kafka SMT would be the built-in solution to the problem, as this solution isn't failure proof – OneCricketeer Apr 22 '18 at 04:34
1

+1 to @cricket_007's suggestion - use the io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope single message transformation. You can read more about SMTs and their benefit's here.

Robin Moffatt
  • 30,382
  • 3
  • 65
  • 92
  • Thanks for the links. I've also read part1 and part2. They were helpful in understanding the overall concept. In addition, I've taken a look at two other great documents to get to know deeper on transformation. My problem, at this point however, was that I wasn't able to debug some errors occurred. I'll edit my connect-mongo-source.properties file and paste error messages. – higee Apr 25 '18 at 08:48
  • http://kafka.apache.org/documentation.html#connect_transforms and https://www.confluent.io/thank-you/single-message-transformations-not-transformations-youre-looking/ – higee Apr 25 '18 at 08:48
  • Here was another try. According to this page http://debezium.io/docs/configuration/mongodb-event-flattening/ on `Configuration` section, it says we need to put that configuration inside sink connector property file. In this time, I got different error message. I'll put that one too on my original question. – higee Apr 25 '18 at 09:26
  • @GeeYeolNahm Is it possible to provide a message from the topic that causes the error? – Jiri Pechanec May 04 '18 at 03:54
  • @JiriPechanec I've included in my original question. Code block just below "Then, I get following result...." is the message. Will you be able to check it? – higee May 04 '18 at 04:23