問題描述
我正在嘗試為 Structured Streaming
編寫一個自定義接收器,它將使用來自 RabbitMQ
的消息.Spark
最近發布 DataSource V2 API,看起來很有前途.由于它抽象了許多細節,我想使用這個 API 以既簡單又性能好.但是,由于它很新,因此可用的資源并不多.我需要經驗豐富的 Spark
人員的說明,因為他們會更容易掌握關鍵點.我們開始:
I am trying to write a custom receiver for Structured Streaming
that will consume messages from RabbitMQ
.
Spark
recently released DataSource V2 API, which seems very promising. Since it abstracts away many details, I want to use this API for the sake of both simplicity and performance. However, since it's quite new, there are not many sources available. I need some clarification from experienced Spark
guys, since they will grasp the key points easier. Here we go:
我的起點是博客文章系列,第一部分 這里.它展示了如何在沒有流式傳輸功能的情況下實現數據源.為了制作流媒體源,我稍微改變了它們,因為我需要實現 MicroBatchReadSupport 代替(或補充)DataSourceV2.
My starting point is the blog post series, with the first part here. It shows how to implement a data source, without streaming capability. To make a streaming source, I slightly changed them, since I need to implement MicroBatchReadSupport instead of (or in addition to) DataSourceV2.
為了提高效率,明智的做法是讓多個 spark 執行器同時使用 RabbitMQ
,即來自同一個隊列.如果我不感到困惑,輸入的每個分區 - 在 Spark
的術語中 - 對應于隊列中的消費者 - 在 RabbitMQ
術語中.因此,我們需要為輸入流設置多個分區,對吧?
To be efficient, it's wise to have multiple spark executors consuming RabbitMQ
concurrently, i.e. from the same queue. If I'm not confused, every partition of the input -in Spark
's terminology- corresponds to a consumer from the queue -in RabbitMQ
terminology. Thus, we need to have multiple partitions for the input stream, right?
與 該系列的第 4 部分類似,我實現了 MicroBatchReader如下:
Similar with part 4 of the series, I implemented MicroBatchReader as follows:
@Override
public List<DataReaderFactory<Row>> createDataReaderFactories() {
int partition = options.getInt(RMQ.PARTITICN, 5);
List<DataReaderFactory<Row>> factories = new LinkedList<>();
for (int i = 0; i < partition; i++) {
factories.add(new RMQDataReaderFactory(options));
}
return factories;
}
我正在返回一個工廠列表,并希望列表中的每個實例都將用于創建一個讀取器,該讀取器也是一個消費者.這種方法正確嗎?
I am returning a list of factories, and hope that every instance in the list will be used to create a reader, which will be also a consumer. Is that approach correct?
我希望我的接收器是可靠的,即在每條處理過的消息之后(或至少寫入檢查點目錄以進行進一步處理),我需要將其返回給 RabbitMQ
.問題從這里開始:這些工廠是在驅動程序中創建的,實際的讀取過程通過 DataReaders.但是,commit 方法是 MicroBatchReader
的一部分,而不是 DataReader代碼>.由于每個
MicroBatchReader
有許多 DataReader
,我應該如何將這些消息回傳給 RabbitMQ
?或者我應該確認 next 方法在 DataReader
上被調用?安全嗎?如果是這樣,那么 commit
函數的目的是什么?
I want my reciever to be reliable, i.e. after every processed message (or at least written to chekpoint directory for further processing), I need to ack it back to RabbitMQ
. The problem starts after here: these factories are created at the driver, and the actual reading process takes place at executors through DataReaders. However, the commit method is a part of MicroBatchReader
, not DataReader
. Since I have many DataReader
s per MicroBatchReader
, how should I ack these messages back to RabbitMQ
? Or should I ack when the next method is called on DataReader
? Is it safe? If so, what is the purpose of commit
function then?
澄清: OBFUSCATION:答案中提供的有關重命名某些類/函數的鏈接(除了那里的解釋)讓一切更加清晰 比以往任何時候都更糟.引用 那里:
CLARIFICATION: OBFUSCATION: The link provided in the answer about the renaming of some classes/functions (in addition to the explanations there) made everything much more clear worse than ever. Quoting from there:
重命名:
DataReaderFactory
到InputPartition
DataReader
到 InputPartitionReader
...
InputPartition
的目的是管理關聯的閱讀器,現在稱為 InputPartitionReader
,帶有顯式創建操作以鏡像關閉操作.這是沒有從 API 中清除的時間更長,因為 DataReaderFactory
似乎更多比它更通用,并且不清楚為什么要生產一組它們閱讀.
InputPartition
's purpose is to manage the lifecycle of the
associated reader, which is now called InputPartitionReader
, with an
explicit create operation to mirror the close operation. This was no
longer clear from the API because DataReaderFactory
appeared to be more
generic than it is and it isn't clear why a set of them is produced for
a read.
但是,docs 明確表示讀取器工廠將被序列化并發送到執行器,然后將在執行器上創建數據讀取器并進行實際讀取."
However, the docs clearly say that "the reader factory will be serialized and sent to executors, then the data reader will be created on executors and do the actual reading."
為了使消費者可靠,我必須僅在特定消息在 Spark 端提交后才對它進行 ACK.請注意消息必須在傳遞消息的同一連接上進行確認,但在驅動程序節點調用提交函數.如何在 worker/executor 節點上提交?
To make the consumer reliable, I have to ACK for a particular message only after it is committed at Spark side. Note that the messages have to be ACKed on the same connection that it has been delivered through, but commit function is called at driver node. How can I commit at the worker/executor node?
推薦答案
> 我正在返回一個工廠列表,并希望列表中的每個實例都用于創建一個讀取器,它也是一個消費者.這種方法正確嗎?源 [socket][1] 源實現有一個線程將消息推送到內部 ListBuffer.換句話說,有一個消費者(線程)填充了內部 ListBuffer,它**然后**被`planInputPartitions`劃分為多個分區(`createDataReaderFactories` [renamed][2] 到 `planInputPartitions`).此外,根據 [MicroBatchReadSupport][3] 的 Javadoc> 執行引擎將在流式查詢開始時創建一個微批處理讀取器,為每個要處理的批處理交替調用 setOffsetRange 和 createDataReaderFactories,然后在執行完成時調用 stop().請注意,由于重新啟動或故障恢復,單個查詢可能會執行多次.換句話說,`createDataReaderFactories` 應該被調用 **multiple** 次,據我了解,這表明每個 `DataReader` 負責一個靜態輸入分區,這意味著 DataReader 不應該是消費者.----------> 但是,commit 方法是 MicroBatchReader 的一部分,而不是 DataReader ... 如果是這樣,那么 commit 函數的目的是什么?提交函數的部分基本原理可能是防止 MicroBatchReader 的內部緩沖區變大.通過提交偏移量,您可以有效地從緩沖區中刪除小于偏移量的元素,因為您承諾不再處理它們.您可以使用 `batches.trimStart(offsetDiff)` 在套接字源代碼中看到這種情況<小時><刪除>我不確定是否要實現一個可靠的接收器,所以我希望一個更有經驗的 Spark 人能過來解決你的問題,因為我也有興趣!希望這可以幫助!
編輯
我只研究了 socket 和 wiki-edit 來源.這些資源還沒有準備好生產,這是問題所在.相反, kafka 源是更好的起點,與前面提到的源不同,它有多個像作者一樣的消費者正在尋找.
I had only studied the socket, and wiki-edit sources. These sources are not production ready, which is something that the question was was not looking for. Instead, the kafka source is the better starting point which has, unlike the aforementioned sources, multiple consumers like the author was looking for.
但是,也許如果您正在尋找不可靠的來源,上面的套接字和 wikiedit 來源提供了一個不太復雜的解決方案.
However, perhaps if you're looking for unreliable sources, the socket and wikiedit sources above provide a less complicated solution.
這篇關于使用 RabbitMQ 源的 Spark 結構化流的文章就介紹到這了,希望我們推薦的答案對大家有所幫助,也希望大家多多支持html5模板網!