問題描述
我正在嘗試使用 Apache Kafka 將事件從
雖然插入和更新工作正常,但我無法理解了解如何從 MySQL
流式傳輸到 PostgreSQL
.MySQL
中刪除記錄并將此事件流式傳輸到 PostgreSQL
.
I am trying to stream events from MySQL
to PostgreSQL
using Apache Kafka.
Although insertions and updates work fine, I can't figure out how to delete a record from MySQL
and stream this event to PostgreSQL
.
假設以下拓撲:
+-------------+
| |
| MySQL |
| |
+------+------+
|
|
|
+---------------v------------------+
| |
| Kafka Connect |
| (Debezium, JDBC connectors) |
| |
+---------------+------------------+
|
|
|
|
+-------v--------+
| |
| PostgreSQL |
| |
+----------------+
我正在使用以下 docker 鏡像;
I am using the following docker images;
- Apache-Zookeper
- Apache-Kafka
- Debezium/JDBC 連接器
然后
# Start the application
export DEBEZIUM_VERSION=0.6
docker-compose up
# Start PostgreSQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json
# Start MySQL connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @source.json
這里是MySQL數據庫的內容;
Here is the content of MySQL database;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
| 1004 | Anne | Kretchmar | annek@noanswer.org |
+------+------------+-----------+-----------------------+
并且我們可以驗證PostgresSQL的內容是一樣的;
And we can verify that the content of PostgresSQL is identical;
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)
假設我想從 MySQL 數據庫中刪除 id=1004
的記錄;
Assume that I want to delete the record with id=1004
from MySQL database;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory'
mysql> delete from customers where id = 1004;
docker-compose -f docker-compose-jdbc.yaml exec mysql bash -c 'mysql -u $MYSQL_USER -p$MYSQL_PASSWORD inventory -e "select * from customers"'
+------+------------+-----------+-----------------------+
| id | first_name | last_name | email |
+------+------------+-----------+-----------------------+
| 1001 | Sally | Thomas | sally.thomas@acme.com |
| 1002 | George | Bailey | gbailey@foobar.com |
| 1003 | Edward | Walker | ed@walker.com |
+------+------------+-----------+-----------------------+
雖然從 MySQL 中刪除了該記錄,但該條目仍然出現在 PostgresSQL 中
Although the record is deleted from MySQL, the entry still appears in PostgresSQL
docker-compose -f docker-compose-jdbc.yaml exec postgres bash -c 'psql -U $POSTGRES_USER $POSTGRES_DB -c "select * from customers"'
last_name | id | first_name | email
-----------+------+------------+-----------------------
Thomas | 1001 | Sally | sally.thomas@acme.com
Bailey | 1002 | George | gbailey@foobar.com
Walker | 1003 | Edward | ed@walker.com
Kretchmar | 1004 | Anne | annek@noanswer.org
(4 rows)
我知道支持軟刪除,但是是否可以從 PostgresSQL
中完全刪除該特定條目(通過 Apache-Kafka 從 MySQL 流式傳輸 del 事件)?
I understand that soft deletes are supported however, is it possible to completely delete that particular entry from PostgresSQL
as well (by streaming the del event from MySQL via Apache-Kafka)?
這是source.json
文件的內容
{
"name": "inventory-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "mysql",
"database.port": "3306",
"database.user": "debezium",
"database.password": "dbz",
"database.server.id": "184054",
"database.server.name": "dbserver1",
"database.whitelist": "inventory",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
這里是jdbc-sink.json
文件的內容
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "customers",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope",
"auto.create": "true",
"insert.mode": "upsert",
"pk.fields": "id",
"pk.mode": "record_value"
}
}
我也嘗試設置 "pk.mode": "record_key"
和 "delete.enabled": "true"
(錯誤修復建議) 但這種修改似乎不起作用.
I have also tried to set "pk.mode": "record_key"
and "delete.enabled": "true"
(bug fix suggestion) but this modification doesn't seem to work.
推薦答案
Confluent JDBC 接收器連接器當前不支持刪除.有一個待處理的拉取請求(您已鏈接到它),但尚未合并.
Deletes are currently not supported by the Confluent JDBC sink connector. There's a pending pull request (you already linked to it), but this hasn't been merged yet.
目前,您可以自己基于該分支構建 JDBC 接收器連接器,也可以創建一個簡單的自定義接收器連接器,該連接器通過在目標數據庫上執行相應的 DELETE 語句來處理邏輯刪除事件.
For the time being, you could either build the JDBC sink connector based on that branch yourself, or you create a simple custom sink connector which just handles tombstone events by executing a corresponding DELETE statement on the target database.
這篇關于通過 Apache-kafka 將刪除事件從 MySQL 流式傳輸到 PostgreSQL的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!