問題描述
我想在 Apache Kafka 中制作實時數據管道.我有位于遠程位置的數據庫,并且該數據庫不斷更新.我應該使用哪個 Kafka 連接 API 來從數據庫中提取數據并實時攝取到 Kafka 代理中?稍后我將使用 kafka 流和 KSQL 運行臨時查詢來執行指標.
任何幫助將不勝感激!
如果您想創建實時數據管道,您需要使用能夠從 MySQL 流式傳輸更改的變更數據捕獲 (CDC) 工具.我建議使用 Debezium,這是一個用于更改數據捕獲的開源分布式平臺.
捕獲插入
當一個新記錄被添加到一個表中時,會產生一個類似于下面的 JSON:
<代碼>{有效載荷":{之前":空,后":{"id":1005,"first_name":"Giorgos","last_name":"無數","電子郵件":"giorgos@abc.com"},來源":{"name":"dbserver1",server_id":223344,ts_sec":1500369632,gtid":空,"file":"mysql-bin.000003",位置":364,行":0,快照":空,線程":13,"db":"庫存",表":客戶"},"op":"c",ts_ms":1500369632095}}
before
對象為空,after
對象包含新插入的值.請注意,op
屬性是 c
,表明這是一個 CREATE 事件.
捕獲更新
假設 email
屬性已更新,將生成類似于以下的 JSON:
<代碼>{有效載荷":{前":{"id":1005,"first_name":"Giorgos","last_name":"無數","電子郵件":"giorgos@abc.com"},后":{"id":1005,"first_name":"Giorgos","last_name":"無數","email":"newEmail@abc.com"},來源":{"name":"dbserver1",server_id":223344,"ts_sec":1500369929,gtid":空,"file":"mysql-bin.000003",位置":673,行":0,快照":空,線程":13,"db":"庫存",表":客戶"},"op":"你",ts_ms":1500369929464}}
注意 op
現在是 u
,表明這是一個 UPDATE 事件.before
對象顯示更新前的行狀態,after
對象捕獲更新行的當前狀態.
捕獲刪除
現在假設該行已被刪除;
<代碼>{有效載荷":{前":{"id":1005,"first_name":"Giorgos","last_name":"無數","email":"newEmail@abc.com"},之后":空,來源":{"name":"dbserver1",server_id":223344,ts_sec":1500370394,gtid":空,"file":"mysql-bin.000003",位置":1025,行":0,快照":空,線程":13,"db":"庫存",表":客戶"},"op":"d",ts_ms":1500370394589}}
op
new 等于 d
,表示 DELETE 事件.after
屬性將為 null 并且 before
對象包含被刪除之前的行.
您還可以查看他們網站上提供的詳盡教程.
示例配置一個 MySQL 數據庫
<代碼>{名稱":庫存連接器",(1)配置":{"connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)"database.hostname": "192.168.99.100", (3)"database.port": "3306", (4)"database.user": "debezium", (5)"database.password": "dbz", (6)"database.server.id": "184054", (7)"database.server.name": "fullfillment", (8)database.whitelist":庫存",(9)"database.history.kafka.bootstrap.servers": "kafka:9092", (10)"database.history.kafka.topic": "dbhistory.fullfillment" (11)"include.schema.changes": "true" (12)}}
<塊引用>
1 當我們向 Kafka Connect 注冊時我們的連接器的名稱服務.
2 此 MySQL 連接器類的名稱.
3 地址MySQL服務器.
4 MySQL 服務器的端口號.
5 名稱具有所需權限的 MySQL 用戶.
6 密碼具有所需權限的 MySQL 用戶.
7 連接器的標識符在 MySQL 集群中必須是唯一的,并且類似于MySQL 的 server-id 配置屬性.
8 邏輯名MySQL 服務器/集群,形成命名空間,用于所有連接器寫入的 Kafka 主題的名稱,Kafka連接模式名稱,以及相應 Avro 的命名空間使用 Avro 連接器時的架構.
9 所有數據庫的列表由此連接器將監視的此服務器托管.這是可選,還有其他屬性可用于列出數據庫和要包括或排除在監視之外的表.
10 卡夫卡名單此連接器將用于編寫和恢復 DDL 的代理對數據庫歷史主題的語句.
11 數據庫名稱連接器將寫入和恢復 DDL 的歷史主題聲明.本主題僅供內部使用,不應使用由消費者.
12 指定連接器應該使用名為 fullfillment 事件的模式更改主題生成消費者可以使用的 DDL 更改.
I want to make real-time data pipeline in Apache Kafka. I have database which is located at remote location and that database continuously updating. Can anybody which Kafka connect API i should use to pull the data from database and ingest into Kafka broker in real time? later on i would use kafka stream and KSQL to run ad-hoc queries to perform the metrics.
Any help would be highly appreciated!
If you want to create a real-time data pipeline you need to use a Change Data Capture (CDC) tool which is able to stream changes from MySQL. I would suggest Debezium which is an open source distributed platform for change data capture.
Capturing Inserts
When a new record is added to a table, a JSON similar to the one below will be produced:
{
"payload":{
"before":null,
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369632,
"gtid":null,
"file":"mysql-bin.000003",
"pos":364,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"c",
"ts_ms":1500369632095
}
}
before
object is null and after
object contains the newly inserted values. Note that the op
attribute is c
, indicating that this was a CREATE event.
Capturing Updates
Assuming that email
attribute has been updated, a JSON similar to the one below will be produced:
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"giorgos@abc.com"
},
"after":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500369929,
"gtid":null,
"file":"mysql-bin.000003",
"pos":673,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"u",
"ts_ms":1500369929464
}
}
Notice op
which is now u
, indicating that this was an UPDATE event. before
object shows the row state before the update and after
object captures the current state of the updated row.
Capturing deletes
Now assume that the row has been deleted;
{
"payload":{
"before":{
"id":1005,
"first_name":"Giorgos",
"last_name":"Myrianthous",
"email":"newEmail@abc.com"
},
"after":null,
"source":{
"name":"dbserver1",
"server_id":223344,
"ts_sec":1500370394,
"gtid":null,
"file":"mysql-bin.000003",
"pos":1025,
"row":0,
"snapshot":null,
"thread":13,
"db":"inventory",
"table":"customers"
},
"op":"d",
"ts_ms":1500370394589
}
}
op
new is equal to d
, indicating a DELETE event. after
attribute will be null and before
object contains the row before it gets deleted.
You can also have a look at the extensive tutorial provided in their website.
EDIT: Example configuration for a MySQL database
{
"name": "inventory-connector", (1)
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector", (2)
"database.hostname": "192.168.99.100", (3)
"database.port": "3306", (4)
"database.user": "debezium", (5)
"database.password": "dbz", (6)
"database.server.id": "184054", (7)
"database.server.name": "fullfillment", (8)
"database.whitelist": "inventory", (9)
"database.history.kafka.bootstrap.servers": "kafka:9092", (10)
"database.history.kafka.topic": "dbhistory.fullfillment" (11)
"include.schema.changes": "true" (12)
}
}
1 The name of our connector when we register it with a Kafka Connect service.
2 The name of this MySQL connector class.
3 The address of the MySQL server.
4 The port number of the MySQL server.
5 The name of the MySQL user that has the required privileges.
6 The password for the MySQL user that has the required privileges.
7 The connector’s identifier that must be unique within the MySQL cluster and similar to MySQL’s server-id configuration property.
8 The logical name of the MySQL server/cluster, which forms a namespace and is used in all the names of the Kafka topics to which the connector writes, the Kafka Connect schema names, and the namespaces of the corresponding Avro schema when the Avro Connector is used.
9 A list of all databases hosted by this server that this connector will monitor. This is optional, and there are other properties for listing the databases and tables to include or exclude from monitoring.
10 The list of Kafka brokers that this connector will use to write and recover DDL statements to the database history topic.
11 The name of the database history topic where the connector will write and recover DDL statements. This topic is for internal use only and should not be used by consumers.
12 The flag specifying that the connector should generate on the schema change topic named fullfillment events with the DDL changes that can be used by consumers.
這篇關于如何從Apache Kafka中的遠程數據庫中提取數據?的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!