問題描述
我嘗試在 PostgreSQL 之間構(gòu)建 Kafka 連接管道作為源到 SQL Server 作為目標(biāo).我使用了 3 個 Kafka broker,需要消費(fèi) 252 個主題(一個主題與一張 PostgreSQL 表相同).運(yùn)行一個多小時后,252張表中只能拉出218張.我發(fā)現(xiàn)的錯誤是 SQL Server 中存在死鎖機(jī)制,可以將事務(wù)保存到 SQL Server 并嘗試重試,Debezium 復(fù)制槽也已存在.
Hi i have try to build Kafka connect pipeline between PostgreSQL as source to SQL Server as the destination. I used 3 Kafka brokers, and need to consume 252 topics (one topics same as one PostgreSQL table). After run for more than an hour, it only can pull 218 out of 252 tables. The error that i found is there's deadlock mechanism in SQL Server which can hold transaction to SQL Server and try to retry it, also Debezium replication slot has been there.
我在接收器上使用最多 3 個工人的分布式連接器,但也許這似乎還不夠.還可以嘗試使用更高的 offset.time_out.ms 到 60000 和更高的偏移分區(qū) (100).恐怕這不是我想要的生產(chǎn)水平.任何人都可以就此案提出建議嗎?是否有任何計算可以確定我需要的最佳工人數(shù)量?
I use distributed connectors with 3 max worker on sink, but maybe it seems not enough. Also try with higher offset.time_out.ms to 60000 and higher offset partition (100). I'm afraid that this is not an production level that i want. Anyone can give suggestion about this case? Is there any calculation to decide best number of workers that i need?
更新
這里出現(xiàn)了一些錯誤.我看到一些連接器被殺死了.有人告訴我 死鎖發(fā)生在 SQL SERVER 中 :
here some error i get. I see some connectors are killed. One tell me that deadlock happen in SQL SERVER :
[2020-03-26 15:06:28,494] ERROR WorkerSinkTask{id=sql_server_sink_XXA-0} RetriableException from SinkTask: (org.apache.kafka.connect.runtime.WorkerSinkTask:552)
org.apache.kafka.connect.errors.RetriableException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:93)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:539)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:322)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:224)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:192)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: Transaction (Process ID 62) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.
2020 年 4 月 14 日更新
我仍然有這個問題,我忘了告訴我如何部署連接器.現(xiàn)在我使用 2 個工人,一個用于源,一個用于接收器.我在 csv 中列出我的所有表和 pk 并循環(huán)遍歷行以創(chuàng)建連接器而無需睡眠或等待每分鐘.我還為每個主題使用單個主題分區(qū)和 3 個副本.但是我仍然有sql server連接死鎖
I still have problem with this, i forgot to tell about how i deploy the connectors. Now i use 2 workers, one for source and one for sink. I list all of my tables and pk in an csv and loop through rows to create the connectors without sleep or wait for every minutes. I also use single topics partition and 3 replica for each topics. But i still have sql server connection deadlock
推薦答案
問題可能是同時訪問多個任務(wù)的同一個 SQL 表,并導(dǎo)致同步問題,如您提到的死鎖.
由于您已經(jīng)擁有大量主題,并且您的連接器可以并行訪問它們,我建議您將每個主題的分區(qū)數(shù)減少到 1(減少分區(qū)數(shù)在Kafka,因此您應(yīng)該刪除并使用新的分區(qū)數(shù)重新創(chuàng)建每個主題).
這樣,每個主題只有一個分區(qū);每個分區(qū)只能在單個線程(/task/consumer)中訪問,因此沒有機(jī)會對同一個表進(jìn)行并行 SQL 事務(wù).
The problem may be accessing the same SQL table with multiple tasks in the same time and causing synchronization problems like deadlocks as you mentioned.
Since you already have a large number of topics, and your connector can access them in parallel, I would suggest you to reduce the number partitions for every topic to just 1 (reduce number of partitions is not supported in Kafka so you should delete and recreate every topic with the new number of partitions).
This way, every topic have only one partition; every partition can be accessed only in a single thread(/task/consumer) so there is no chance for parallel SQL transactions to the same table.
或者,更好的方法是創(chuàng)建一個包含 3 個分區(qū)的主題(與您擁有的任務(wù)/消費(fèi)者數(shù)量相同),并讓 生產(chǎn)者使用 SQL 表名作為消息鍵.
Kafka 保證具有相同鍵的消息總是轉(zhuǎn)到同一個分區(qū),因此具有相同表的所有消息將駐留在單個分區(qū)上(單線程消耗).
Alternatively, a better approach is to create a single topic with 3 partitions (same as the number of tasks/consumers you have) and make the producer use the SQL table name as the message key.
Kafka guarantees messages with the same key to always go to the same partition, so all the messages with the same table will reside on a single partition (single thread consuming).
如果你覺得有用,我可以附上更多關(guān)于如何創(chuàng)建 Kafka Producer 和發(fā)送密鑰消息的信息.
If you find it useful, I can attach more information about how to create Kafka Producer and send keyed messages.
這篇關(guān)于如何使用 Debezium 從 MS SQL 將 250 個表攝取到 Kafka的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網(wǎng)!