1

I have to send records from Aurora/Mysql to MSK and from there to Elastic search service

Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->Elastic search

The record in Aurora table structure is something like this
I think record will go to AWS MSK in this format.

"o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?xml version=""1.0"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-5d17-138e-9749-setwr23424","","","",,"","",""

So in order to consume by elastic search i need to use proper schema so schema registry i have to use.

My question

Question 1

How should i use schema registry for above type of message schema registry is required ?. Do i have to create JSON structure for this and if yes where i have keep that. More help required here to understand this ?

I have edited

vim /usr/local/confluent/etc/schema-registry/schema-registry.properties

Mentioned zookeper but i did not what is kafkastore.topic=_schema How to link this to custom schema .

Even i started and got this error

Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas not present in metadata after 60000 ms.

Which i was expecting because i did not do anything about schema .

I do have jdbc connector installed and when i start i get below error

Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`

Question 2 Can i create two onnector on one ec2 (jdbc and elastic serach one ).If yes do i have to start both in sepearte cli ?

Question 3 When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties I see only propeties value like below

name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
mode=incrementing
incrementing.column.name=id
topic.prefix=trf-aurora-fspaudit-

In the above properties file where i can mention schema name and table name?

Based on answer i am updating my configuration for Kafka connect JDBC

---------------start JDBC connect elastic search -----------------------------

wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent

wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
tar -xzf  mysql-connector-java-5.1.48.tar.gz
sudo mv mysql-connector-java-5.1.48 mv /usr/local/confluent/share/java/kafka-connect-jdbc

And then

vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

Then i modified below properties

connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf
mode=incrementing
connection.user=admin
connection.password=Welcome123
table.whitelist=PANStatementInstanceLog
schema.pattern=dbo

Last i modified

vim /usr/local/confluent/etc/kafka/connect-standalone.properties

and here i modified below properties

bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-east-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java

When i list topic i do not see any topic listed for table name .

Stack trace for the error message

[2020-01-03 07:40:57,169] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
[2020-01-03 07:40:57,169] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
        at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
        at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)
        at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
        at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)

        curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'
Sudarshan kumar
  • 1,503
  • 4
  • 36
  • 83
  • One question: how does data get **into** Aurora to begin with? – OneCricketeer Jan 01 '20 at 11:21
  • @cricket_007 i am using DMS to put data into aurora .. – Sudarshan kumar Jan 01 '20 at 12:35
  • So, from another database? Why do you need Aurora, then if you could setup kafka connect from that database? Or how does data get **there**? Why can't you just replace/add a Kafka producer wherever the source of the record is? – OneCricketeer Jan 01 '20 at 12:40
  • @cricket_007 i am pushing data from on prem data base to Aurora .So DMS is used for that purpose .And this set up is completely on cloud . – Sudarshan kumar Jan 01 '20 at 12:46
  • I'm still not sure I understand what DMS is for. Is that cheaper than using Debezium locally to pull data into MSK Kafka? Until your apps are fully in the cloud, you'll need to keep running DMS, right? So, why not refactor your apps to write directly to Kafka, then use JDBC **sink** with Aurora in addition to the Elastic sink to put data in both places – OneCricketeer Jan 01 '20 at 15:57
  • Could you please answer my last comment here? – OneCricketeer Jan 03 '20 at 14:20
  • So want to migrate database from on prem to aws dms is used in order to achieve that .Also we are making data sink to kinesis and also in S3 .Debezium can also be used here but in our on prem we dont have kafka set up and in aws we are just starting up . – Sudarshan kumar Jan 03 '20 at 14:24
  • You don't need Kafka on prem. You could allow Debezium in EC2 to connect and pull your on prem database to MSK – OneCricketeer Jan 03 '20 at 14:28
  • @cricket_007 yes i agree but DMS is already in picture so we are trying to evaluate from there itself .Even we might end up doing what you are suggesting .But i feel the solution should work for on prem and aws both . – Sudarshan kumar Jan 03 '20 at 14:31

2 Answers2

2

I'm guessing that you're planning to use AVRO in order to transfer data so don't forget to specify AVROConverter as the default converter when you start up your Kafka Connect workers. If you will use JSON then Schema Registry is not needed.

1.1 kafkastore.topic=_schema

Have you started up your own schema registry? When you start Schema Registry you'll have to specify the "schemas" topic. Basically, this topic will be used by Schema Registry to store the schemas registered by it and in case of a failure, it can recover them from there.

1.2 jdbc connector installed and when i start i get below error By default, JDBC Connector only works with SQLite and PostgreSQL. If you would like it to work with a MySQL database then you should add the MySQL Driver to the classpath as well.

2.It depends on how you are deploying your Kafka Connect workers. If you go for Distributed mode ( recommended ) then you don't really need separate CLI's. You can deploy your connectors through the Kafka Connect REST API.

3.There is another property called table.whitelist on which you can specify your schemas and tables. e.g: table.whitelistusers,products,transactions

BogdanSucaciu
  • 884
  • 6
  • 13
  • MySQL Driver to the classpath means `plugin.path=/usr/local/confluent/share/java` ? – Sudarshan kumar Jan 01 '20 at 12:50
  • 1
    that's correct! using the `plugin.path` property you're defining a path from where Kafka Connect can load external dependencies ( JAR's ). Here you can add other connectors and dependencies of those connectors ( e.g: drivers, loggers, API clients, etc. ) – BogdanSucaciu Jan 01 '20 at 14:29
  • JDBC drivers are not pulled as plugins, only connectors, transforns, converters, etc – OneCricketeer Jan 01 '20 at 16:01
  • i did not get any properties where we need to put kafka topic name ..I have updated my question as well ..Please have a look – Sudarshan kumar Jan 02 '20 at 06:44
  • @SUDARSHAN: that's because the topics follow a naming convention. Considering the JDBC Source Connector then the topic will have the same name as the table. You can also add the `topic.prefix` property which will prefix table names. e.g: "topic.prefix:mysql-" and the table is called "users" then the topic will be called "mysql-users". @cricket_007: I'm not sure that I understood your remark properly, can you please elaborate? – BogdanSucaciu Jan 02 '20 at 14:27
  • Okk so in that case i dont need to give topic name ? But why i am not able to see records from Aurora to MSK ...Everthing seems fine and connectors also starting without any error – Sudarshan kumar Jan 02 '20 at 14:42
  • I had updated my question with all details of properties file ..Can you please have a look...as you said i have updated few more properties as well like table names and schema .. – Sudarshan kumar Jan 02 '20 at 14:46
2

schema registry is required ?

No. You can enable schemas in json records. JDBC source can create them for you based on the table information

value.converter=org.apache.kafka...JsonConverter 
value.converter.schemas.enable=true

Mentioned zookeper but i did not what is kafkastore.topic=_schema

If you want to use Schema Registry, you should be using kafkastore.bootstrap.servers.with the Kafka address, not Zookeeper. So remove kafkastore.connection.url

Please read the docs for explanations of all properties

i did not do anything about schema .

Doesn't matter. The schemas topic gets created when the Registry first starts

Can i create two onnector on one ec2

Yes (ignoring available JVM heap space). Again, this is detailed in the Kafka Connect documentation.

Using standalone mode, you first pass the connect worker configuration, then up to N connector properties in one command

Using distributed mode, you use the Kafka Connect REST API

https://docs.confluent.io/current/connect/managing/configuring.html

When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties

First of all, that's for Sqlite, not Mysql/Postgres. You don't need to use the quickstart files, they are only there for reference

Again, all properties are well documented

https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc

I do have jdbc connector installed and when i start i get below error

Here's more information about how you can debug that

https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/


As stated before, I would personally suggest using Debezium/CDC where possible

Debezium Connector for RDS Aurora

OneCricketeer
  • 179,855
  • 19
  • 132
  • 245
  • But as you can see my records are not JSON.So even then i dont need schema registry ? – Sudarshan kumar Jan 01 '20 at 14:11
  • 1
    Huh? JDBC records **become** JSON if using the JDBC source with the JSON Converter. They **become** Avro when using the AvroConverter. Only the AvroConverter *requires* Schema Registry settings... I'm not sure I can make that clearer – OneCricketeer Jan 01 '20 at 16:05
  • After all set up in fresh EC2 .I am getting below error `Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.` – Sudarshan kumar Jan 01 '20 at 16:24
  • Which component reports that? Elasticsearch sink? Did you not use the properties I mentioned in my answer for the jdbc source? Why do you need a fresh EC2? You're just editing property files and remote kafka topics – OneCricketeer Jan 01 '20 at 16:27
  • Yes Elasticsearch sink ..Yes i used same properties exactly . – Sudarshan kumar Jan 01 '20 at 16:29
  • Can you please edit your question to include the messages in the topic from the JDBC source as well as your Connect and JDBC connector configuration files? – OneCricketeer Jan 01 '20 at 18:21
  • updated myquestion for JDBC ..I will update for elastci search also ..But jdbc also not working ..I can not see any data – Sudarshan kumar Jan 02 '20 at 07:42
  • I have updated my question at bottom for elastic search connector also .Please have a look – Sudarshan kumar Jan 02 '20 at 08:03
  • You need to pass all 3 property files to connect-standalone. The worker, JDBC, and Elasticsearch. But only run the Elasticsearch one if you're able to run the JDBC one by itself first. Do you get errors when you do that? – OneCricketeer Jan 02 '20 at 21:35
  • How should pass 3 The worker, JDBC, and Elasticsearch ? i m achnging only 2 propeties file and then run also with only 2 ..Can you please help here . – Sudarshan kumar Jan 03 '20 at 02:50
  • Can you please example of pass all 3 property files to connect-standalone ? – Sudarshan kumar Jan 03 '20 at 02:54
  • if you run connect-standalone by itself without any property files, you can see its usage example takes a list of property files, not just two – OneCricketeer Jan 03 '20 at 06:00
  • I am sorry but i am not able to crack it JDBC part ..Please help me with this..I am now more confused with JDCB part – Sudarshan kumar Jan 03 '20 at 06:58
  • Also i do not see worker.properties file at /usr/local/confluent/etc/kafka location . – Sudarshan kumar Jan 03 '20 at 07:27
  • connect-standalone.properties **is** the worker properties `connect-standalone connect-standalone.properties jdbc.properties elasticsearch.properties`. The files can be stored anywhere on the machine, they don't need to be in the etc folders. In fact, I strongly suggest putting them into github, for example – OneCricketeer Jan 03 '20 at 07:40
  • Then what i am doing is correct i am running this but at last after few minute it fails `Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure` – Sudarshan kumar Jan 03 '20 at 07:53
  • Again, please edit your question to include the full stacktrace – OneCricketeer Jan 03 '20 at 07:55
  • Hi I have updated the stack trace and also my EC2 kafka and is in same VPC and security group .. – Sudarshan kumar Jan 03 '20 at 08:19
  • I'm confused by that last line of what you posted. The properties shown there don't match your file at all – OneCricketeer Jan 03 '20 at 08:33
  • 1
    At this point, though, your database configuration needs fixed somehow. It's not a Kafka problem anymore. And like I said, please search your errors https://stackoverflow.com/questions/2983248/com-mysql-jdbc-exceptions-jdbc4-communicationsexception-communications-link-fai – OneCricketeer Jan 03 '20 at 08:35
  • you were correct ...I am able to connect to RDS but getting this error ` WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 1721` – Sudarshan kumar Jan 03 '20 at 10:36