問(wèn)題描述
我必須將記錄從 Aurora/Mysql 發(fā)送到 MSK,然后再?gòu)哪抢锇l(fā)送到 Elastic 搜索服務(wù)
Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->彈性搜索
極光表結(jié)構(gòu)中的記錄是這樣的
我認(rèn)為記錄將以這種格式發(fā)送到 AWS MSK.
"o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?0xml version=""1"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-38d179749-setwr23424","","","",,"","",""
因此,為了通過(guò)彈性搜索使用,我需要使用正確的架構(gòu),因此我必須使用架構(gòu)注冊(cè)表.
我的問(wèn)題
問(wèn)題 1
對(duì)于需要上述類型的消息架構(gòu)注冊(cè)表,我應(yīng)該如何使用架構(gòu)注冊(cè)表?.我是否必須為此創(chuàng)建 JSON 結(jié)構(gòu),如果是,我將其保留在哪里.這里需要更多幫助才能理解這一點(diǎn)?
我已經(jīng)編輯了
vim/usr/local/confluent/etc/schema-registry/schema-registry.properties
提到了zookeper,但我沒(méi)有提到什么是kafkastore.topic=_schema
如何將其鏈接到自定義架構(gòu).
即使我開(kāi)始并收到此錯(cuò)誤
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas 在 60000 ms 后不存在于元數(shù)據(jù)中.
這是我所期待的,因?yàn)槲覜](méi)有對(duì)架構(gòu)做任何事情.
我確實(shí)安裝了 jdbc 連接器,當(dāng)我啟動(dòng)時(shí)出現(xiàn)以下錯(cuò)誤
無(wú)效值 java.sql.SQLException:找不到適合 jdbc 的驅(qū)動(dòng)程序:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration 無(wú)法打開(kāi)與 jdbc 的連接:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=歡迎123無(wú)效值 java.sql.SQLException:找不到適合 jdbc 的驅(qū)動(dòng)程序:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123用于配置無(wú)法打開(kāi)與 jdbc 的連接:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123您還可以在端點(diǎn) `/{connectorType}/config/validate` 找到上面的錯(cuò)誤列表
問(wèn)題 2我可以在一個(gè) ec2 上創(chuàng)建兩個(gè)連接器嗎(jdbc 和彈性 serach 一個(gè)).如果是,我是否必須在 sepearte cli 中同時(shí)啟動(dòng)?
問(wèn)題 3當(dāng)我打開(kāi) vim/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties我只看到像下面這樣的屬性值
name=test-source-sqlite-jdbc-autoincrementconnector.class=io.confluent.connect.jdbc.JdbcSourceConnector任務(wù).max=1connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123模式=遞增遞增.column.name=idtopic.prefix=trf-aurora-fspaudit-
在上面的屬性文件中,我可以提到架構(gòu)名稱和表名稱嗎?
根據(jù)答案,我正在更新我的 Kafka 連接 JDBC 配置
--------------啟動(dòng)JDBC連接彈性搜索------------------------------
wget/usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/須藤 mv ~/Downloads/confluent-5.2.0/usr/local/confluentwget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gztar -xzf mysql-connector-java-5.1.48.tar.gz須藤 mv mysql-connector-java-5.1.48 mv/usr/local/confluent/share/java/kafka-connect-jdbc
然后
vim/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
然后我修改了下面的屬性
connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf模式=遞增connection.user=adminconnection.password=Welcome123table.whitelist=PANStatementInstanceLogschema.pattern=dbo
最后我修改了
vim/usr/local/confluent/etc/kafka/connect-standalone.properties
在這里我修改了以下屬性
bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-East-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092key.converter.schemas.enable=truevalue.converter.schemas.enable=trueoffset.storage.file.filename=/tmp/connect.offsetsoffset.flush.interval.ms=10000plugin.path=/usr/local/confluent/share/java
當(dāng)我列出主題時(shí),我沒(méi)有看到任何為表名列出的主題.
錯(cuò)誤信息的堆棧跟蹤
[2020-01-03 07:40:57,169] 錯(cuò)誤未能為/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache) 創(chuàng)建作業(yè).kafka.connect.cli.ConnectStandalone:108)[2020-01-03 07:40:57,169] 連接器錯(cuò)誤后停止錯(cuò)誤 (org.apache.kafka.connect.cli.ConnectStandalone:119)java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: 連接器配置無(wú)效并包含以下 2 個(gè)錯(cuò)誤:無(wú)效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信鏈接失敗最后一個(gè)成功發(fā)送到服務(wù)器的數(shù)據(jù)包是 0 毫秒前.驅(qū)動(dòng)程序沒(méi)有收到來(lái)自服務(wù)器的任何數(shù)據(jù)包.用于配置無(wú)法打開(kāi)與 jdbc 的連接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf無(wú)效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信鏈接失敗最后一個(gè)成功發(fā)送到服務(wù)器的數(shù)據(jù)包是 0 毫秒前.驅(qū)動(dòng)程序沒(méi)有收到來(lái)自服務(wù)器的任何數(shù)據(jù)包.用于配置無(wú)法打開(kāi)與 jdbc 的連接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf您還可以在端點(diǎn) `/{connectorType}/config/validate` 找到上面的錯(cuò)誤列表在 org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)在 org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)引起:org.apache.kafka.connect.runtime.rest.errors.BadRequestException:連接器配置無(wú)效并包含以下2個(gè)錯(cuò)誤:無(wú)效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信鏈接失敗最后一個(gè)成功發(fā)送到服務(wù)器的數(shù)據(jù)包是 0 毫秒前.驅(qū)動(dòng)程序沒(méi)有收到來(lái)自服務(wù)器的任何數(shù)據(jù)包.用于配置無(wú)法打開(kāi)與 jdbc 的連接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf無(wú)效值 com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信鏈接失敗最后一個(gè)成功發(fā)送到服務(wù)器的數(shù)據(jù)包是 0 毫秒前.驅(qū)動(dòng)程序沒(méi)有收到來(lái)自服務(wù)器的任何數(shù)據(jù)包.用于配置無(wú)法打開(kāi)與 jdbc 的連接:mysql://****.us-east-1.rds.amazonaws.com:3306/trf您還可以在端點(diǎn) `/{connectorType}/config/validate` 找到上面的錯(cuò)誤列表在 org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)在 org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)在 org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/-d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'
是否需要架構(gòu)注冊(cè)表?
沒(méi)有.您可以在 json 記錄中啟用模式.JDBC 源可以根據(jù)表信息為您創(chuàng)建
value.converter=org.apache.kafka...JsonConvertervalue.converter.schemas.enable=true
<塊引用>
提到了zookeper,但我不知道什么是kafkastore.topic=_schema
如果你想使用 Schema Registry,你應(yīng)該使用 kafkastore.bootstrap.servers
.with Kafka 地址,而不是 Zookeeper.所以刪除 kafkastore.connection.url
請(qǐng)閱讀文檔 所有屬性的解釋
<塊引用>我沒(méi)有對(duì)架構(gòu)做任何事情.
沒(méi)關(guān)系.模式主題在注冊(cè)表第一次啟動(dòng)時(shí)被創(chuàng)建
<塊引用>我可以在一個(gè) ec2 上創(chuàng)建兩個(gè)連接器嗎
是(忽略可用的 JVM 堆空間).同樣,這在 Kafka Connect 文檔中有詳細(xì)說(shuō)明.
使用獨(dú)立模式,您首先傳遞連接工作器配置,然后在一個(gè)命令中最多傳遞 N 個(gè)連接器屬性
使用分布式模式,您使用 Kafka Connect REST API
https://docs.confluent.io/current/connect/managing/configuring.html
<塊引用>當(dāng)我打開(kāi) vim/usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
首先,這是針對(duì) Sqlite,而不是針對(duì) Mysql/Postgres.您不需要使用快速入門文件,它們僅供參考
同樣,所有屬性都有詳細(xì)記錄
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc
<塊引用>我確實(shí)安裝了 jdbc 連接器,當(dāng)我啟動(dòng)時(shí)出現(xiàn)以下錯(cuò)誤
這里有更多關(guān)于如何調(diào)試的信息
https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/
<小時(shí)>如前所述,我個(gè)人建議盡可能使用 Debezium/CDC
用于 RDS Aurora 的 Debezium 連接器
I have to send records from Aurora/Mysql to MSK and from there to Elastic search service
Aurora -->Kafka-connect--->AWS MSK--->kafka connect --->Elastic search
The record in Aurora table structure is something like this
I think record will go to AWS MSK in this format.
"o36347-5d17-136a-9749-Oe46464",0,"NEW_CASE","WRLDCHK","o36347-5d17-136a-9749-Oe46464","<?xml version=""1.0"" encoding=""UTF-8"" standalone=""yes""?><caseCreatedPayload><batchDetails/>","CASE",08-JUL-17 10.02.32.217000000 PM,"TIME","UTC","ON","0a348753-5d1e-17a2-9749-3345,MN4,","","0a348753-5d1e-17af-9749-FGFDGDFV","EOUHEORHOE","2454-5d17-138e-9749-setwr23424","","","",,"","",""
So in order to consume by elastic search i need to use proper schema so schema registry i have to use.
My question
Question 1
How should i use schema registry for above type of message schema registry is required ?. Do i have to create JSON structure for this and if yes where i have keep that. More help required here to understand this ?
I have edited
vim /usr/local/confluent/etc/schema-registry/schema-registry.properties
Mentioned zookeper but i did not what is kafkastore.topic=_schema
How to link this to custom schema .
Even i started and got this error
Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic _schemas not present in metadata after 60000 ms.
Which i was expecting because i did not do anything about schema .
I do have jdbc connector installed and when i start i get below error
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
Invalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123 for configuration Couldn't open connection to jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
Question 2 Can i create two onnector on one ec2 (jdbc and elastic serach one ).If yes do i have to start both in sepearte cli ?
Question 3 When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties I see only propeties value like below
name=test-source-sqlite-jdbc-autoincrement
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
tasks.max=1
connection.url=jdbc:mysql://123871289-eruyre.cluster-ceyey.us-east-1.rds.amazonaws.com:3306/trf?user=admin&password=Welcome123
mode=incrementing
incrementing.column.name=id
topic.prefix=trf-aurora-fspaudit-
In the above properties file where i can mention schema name and table name?
Based on answer i am updating my configuration for Kafka connect JDBC
---------------start JDBC connect elastic search -----------------------------
wget /usr/local http://packages.confluent.io/archive/5.2/confluent-5.2.0-2.11.tar.gz -P ~/Downloads/
tar -zxvf ~/Downloads/confluent-5.2.0-2.11.tar.gz -C ~/Downloads/
sudo mv ~/Downloads/confluent-5.2.0 /usr/local/confluent
wget https://cdn.mysql.com//Downloads/Connector-J/mysql-connector-java-5.1.48.tar.gz
tar -xzf mysql-connector-java-5.1.48.tar.gz
sudo mv mysql-connector-java-5.1.48 mv /usr/local/confluent/share/java/kafka-connect-jdbc
And then
vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
Then i modified below properties
connection.url=jdbc:mysql://fdgfgdfgrter.us-east-1.rds.amazonaws.com:3306/trf
mode=incrementing
connection.user=admin
connection.password=Welcome123
table.whitelist=PANStatementInstanceLog
schema.pattern=dbo
Last i modified
vim /usr/local/confluent/etc/kafka/connect-standalone.properties
and here i modified below properties
bootstrap.servers=b-3.205147-ertrtr.erer.c5.ertert.us-east-1.amazonaws.com:9092,b-6.ertert-riskaudit.ertet.c5.kafka.us-east-1.amazonaws.com:9092,b-1.ertert-riskaudit.ertert.c5.kafka.us-east-1.amazonaws.com:9092
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=/usr/local/confluent/share/java
When i list topic i do not see any topic listed for table name .
Stack trace for the error message
[2020-01-03 07:40:57,169] ERROR Failed to create job for /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties (org.apache.kafka.connect.cli.ConnectStandalone:108)
[2020-01-03 07:40:57,169] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:119)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:116)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 2 error(s):
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
Invalid value com.mysql.jdbc.exceptions.jdbc4.CommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server. for configuration Couldn't open connection to jdbc:mysql://****.us-east-1.rds.amazonaws.com:3306/trf
You can also find the above list of errors at the endpoint `/{connectorType}/config/validate`
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:423)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:188)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:113)
curl -X POST -H "Accept:application/json" -H "Content-Type:application/json" IPaddressOfKCnode:8083/connectors/ -d '{"name": "emp-connector", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "tasks.max": "1", "connection.url": "jdbc:mysql://IPaddressOfLocalMachine:3306/test_db?user=root&password=pwd","table.whitelist": "emp","mode": "timestamp","topic.prefix": "mysql-" } }'
schema registry is required ?
No. You can enable schemas in json records. JDBC source can create them for you based on the table information
value.converter=org.apache.kafka...JsonConverter
value.converter.schemas.enable=true
Mentioned zookeper but i did not what is kafkastore.topic=_schema
If you want to use Schema Registry, you should be using kafkastore.bootstrap.servers
.with the Kafka address, not Zookeeper. So remove kafkastore.connection.url
Please read the docs for explanations of all properties
i did not do anything about schema .
Doesn't matter. The schemas topic gets created when the Registry first starts
Can i create two onnector on one ec2
Yes (ignoring available JVM heap space). Again, this is detailed in the Kafka Connect documentation.
Using standalone mode, you first pass the connect worker configuration, then up to N connector properties in one command
Using distributed mode, you use the Kafka Connect REST API
https://docs.confluent.io/current/connect/managing/configuring.html
When i open vim /usr/local/confluent/etc/kafka-connect-jdbc/source-quickstart-sqlite.properties
First of all, that's for Sqlite, not Mysql/Postgres. You don't need to use the quickstart files, they are only there for reference
Again, all properties are well documented
https://docs.confluent.io/current/connect/kafka-connect-jdbc/index.html#connect-jdbc
I do have jdbc connector installed and when i start i get below error
Here's more information about how you can debug that
https://www.confluent.io/blog/kafka-connect-deep-dive-jdbc-source-connector/
As stated before, I would personally suggest using Debezium/CDC where possible
Debezium Connector for RDS Aurora
這篇關(guān)于Kafka 連接設(shè)置以使用 AWS MSK 從 Aurora 發(fā)送記錄的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!