3

This is a follow up question of my previous question . Because question was becoming very long so i thought to create new one Here is my older question Kafka connect setup to send record from Aurora using AWS MSK

So i have connector running with some error and that's the reason my records are not going into Elastic Search

And here is my properties file

quickstart-elasticsearch.properties

name=elasticsearch-sink
connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
tasks.max=1
topics=fspauditlambda
key.ignore=true
connection.url=https://drtrrterterterterterst-1.es.amazonaws.com
type.name=kafka-connect

and here my connect-standalone.properties details

bootstrap.servers=b-3.rtyrtyty.amazonaws.com:9092,b-6.rtyrtyty.amazonaws.com:9092,b-1.rtyrtyty.us-east-1.amazonaws.com:9092

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=true
value.converter.schemas.enable=true

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java

and then i start my connector but when i do that i get error like

   org.apache.kafka.connect.errors.ConnectException: Cannot create mapping 
{"kafka-connect":{"properties":{"ID":{"type":"text","fields":{"keyword": -- {"root_cause":[{"type":"illegal_argument_exception","reason":"Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true."}],"type":"illegal_argument_exception","reason":"Types cannot be provided in put mapping requests, 
unless the include_type_name parameter is set to true."}

and when i changes properties to below

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000

it created Indises in elastic search but data is not going and getting below error

[2020-01-03 12:27:12,906] ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due to unrecoverable exception.
        at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Cannot infer mapping without schema.
        at io.confluent.connect.elasticsearch.Mapping.inferMapping(Mapping.java:84)
        at io.confluent.connect.elasticsearch.jest.JestElasticsearchClient.createMapping(JestElasticsearchClient.java:292)
        at io.confluent.connect.elasticsearch.Mapping.createMapping(Mapping.java:66)
        at io.confluent.connect.elasticsearch.ElasticsearchWriter.write(ElasticsearchWriter.java:260)

I have even tried this config as well

topic.schema.ignore=true

But then also same error

Updating my MYSQL table definition

CREATE TABLE FSP_AUDIT (
ID    NVARCHAR(255) NOT NULL,
VERSION    numeric(10,0) ,
ACTION_TYPE    NVARCHAR(255) ,
EVENT_TYPE    NVARCHAR(255) ,
CLIENT_ID    NVARCHAR(25) ,
DETAILS    TEXT(40000) ,
OBJECT_TYPE    NVARCHAR(255) ,
UTC_DATE_TIME    TIMESTAMP(6) NOT NULL,
POINT_IN_TIME_PRECISION    NVARCHAR(255) ,
TIME_ZONE    NVARCHAR(255) ,
TIMELINE_PRECISION    NVARCHAR(255) ,
GROUP_ID    NVARCHAR(255) ,
OBJECT_DISPLAY_NAME    NVARCHAR(200) ,
OBJECT_ID    NVARCHAR(255) ,
USR_DISPLAY_NAME    NVARCHAR(1500) ,
USR_ID    NVARCHAR(255) ,
PARENT_EVENT_ID    NVARCHAR(255) ,
NOTES    NVARCHAR(1000) ,
SUMMARY    NVARCHAR(4000) ,
ADTEVT_TO_UTC_DT    TIMESTAMP(6) ,
ADTEVT_TO_DATE_PITP    NVARCHAR(255) ,
ADTEVT_TO_DATE_TZ    NVARCHAR(255) ,
ADTEVT_TO_DATE_TP    NVARCHAR(255) ,
 PRIMARY KEY(ID)
);

Expected schema for Elastic search to be created upfront Is this correct ? please suggest change

{ 
   "schema":{ 
      "type":"struct",
      "fields":[ 
         { 
            "type":"string",
            "optional":false,
            "field":"ID"
         },
         { 
            "type":"integer",
            "optional":true,
            "field":"VERSION"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"ACTION_TYPE"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"EVENT_TYPE"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"CLIENT_ID"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"DETAILS"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"OBJECT_TYPE"
         },
         { 
            "type":"int64",
            "optional":false,
            "name":"org.apache.kafka.connect.data.Timestamp",
            "version":1,
            "field":"UTC_DATE_TIME"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"POINT_IN_TIME_PRECISION"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"TIME_ZONE"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"TIMELINE_PRECISION"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"GROUP_ID"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"OBJECT_DISPLAY_NAME"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"OBJECT_ID"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"USR_DISPLAY_NAME"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"USR_ID"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"PARENT_EVENT_ID"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"NOTES"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"SUMMARY"
         },
         { 
            "type":"int64",
            "optional":true,
            "name":"org.apache.kafka.connect.data.Timestamp",
            "version":1,
            "field":"ADTEVT_TO_UTC_DT"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"ADTEVT_TO_DATE_PITP"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"ADTEVT_TO_DATE_TZ"
         },
         { 
            "type":"string",
            "optional":true,
            "field":"ADTEVT_TO_DATE_TP"
         }
      ],
      "optional":false,
      "name":"FSP_AUDIT"
   }
}
Sudarshan kumar
  • 1,503
  • 4
  • 36
  • 83

1 Answers1

1

The second error is saying that you need a schema, so your first edit to the property file is fine.

The first error comes because of a behavior change in Elasticsearch 6+ that requires defining the index mapping yourself

Types cannot be provided in put mapping requests, unless the include_type_name parameter is set to true

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • I think jdbc connector is not creating correct schema .I tried printing key schema but i see for few of the key schema is not correct like `"field": "VERSION"` I have edited my question with schema that jdbc generates ,if i have to create schema upfront in Elastic search can i use the same schema ? Or do i have to change here anything ? – Sudarshan kumar Jan 03 '20 at 14:32
  • I would need to see your mysql table definition to know that is correct or not. And no, Elasticsearch has its own property types. You can only use the connect schema there as a reference. https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping.html#_explicit_mappings – OneCricketeer Jan 03 '20 at 14:43
  • Okay, so numeric types are mapped to bytes in Connect. It's not wrong, per say, and I'm not sure if it's directly "fixable" or not. At least, not without transforming the message within the Elastic sink configuration or somehow making the Elasticsearch mapping parse the bytes as a number – OneCricketeer Jan 03 '20 at 14:57
  • making this change `"numeric.mapping": "best_fit",` also did not do anything ? Or what if change to elastic search version 6? – Sudarshan kumar Jan 03 '20 at 18:20
  • Anything above version 5 will return the same message – OneCricketeer Jan 04 '20 at 01:22
  • How can i solve this issue ...Same issue is coming in DMS from aurora to eleastic search – Sudarshan kumar Jan 04 '20 at 06:43
  • I already told you. Create the mapping yourself. It has nothing to do with DMS or Kafka Connect – OneCricketeer Jan 04 '20 at 09:20
  • Can you please guide me with that ..Shall i create new question that ? I have a sample mapping ..I just need little guidence with that – Sudarshan kumar Jan 04 '20 at 11:00
  • What issues are you having following the above link? – OneCricketeer Jan 04 '20 at 16:17
  • if i have created schema mapping in kibana with name fsp-audit then this name i have to mention somewhere? It should be somewhere in the connector but where should i mention that ..I did not any property to mention indices . – Sudarshan kumar Jan 04 '20 at 18:27
  • can you please elaborate how will i define index mapping by myself ..In elastic search if i created schema mapping and then how connector will pick up that? – Sudarshan kumar Jan 05 '20 at 09:56