問題描述
我正在使用 python 2.7,并嘗試在自己的進(jìn)程中運(yùn)行一些 CPU 繁重的任務(wù).我希望能夠?qū)⑾l(fā)送回父進(jìn)程,以使其了解進(jìn)程的當(dāng)前狀態(tài).多處理隊(duì)列似乎很適合這個(gè),但我不知道如何讓它工作.
I'm using python 2.7, and trying to run some CPU heavy tasks in their own processes. I would like to be able to send messages back to the parent process to keep it informed of the current status of the process. The multiprocessing Queue seems perfect for this but I can't figure out how to get it work.
所以,這是我的基本工作示例減去隊(duì)列的使用.
So, this is my basic working example minus the use of a Queue.
import multiprocessing as mp
import time
def f(x):
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print str(results.next())
pool.close()
pool.join()
if __name__ == '__main__':
main()
我嘗試以多種方式傳遞隊(duì)列,但它們收到錯(cuò)誤消息RuntimeError:隊(duì)列對(duì)象只能通過繼承在進(jìn)程之間共享".這是我根據(jù)我找到的早期答案嘗試的方法之一.(我在嘗試使用 Pool.map_async 和 Pool.imap 時(shí)遇到了同樣的問題)
I've tried passing the Queue in several ways, and they get the error message "RuntimeError: Queue objects should only be shared between processes through inheritance". Here is one of the ways I tried based on an earlier answer I found. (I get the same problem trying to use Pool.map_async and Pool.imap)
import multiprocessing as mp
import time
def f(args):
x = args[0]
q = args[1]
q.put(str(x))
time.sleep(0.1)
return x*x
def main():
q = mp.Queue()
pool = mp.Pool()
results = pool.imap_unordered(f, ([i, q] for i in range(1, 6)))
print str(q.get())
pool.close()
pool.join()
if __name__ == '__main__':
main()
最后,0 適應(yīng)度方法(使其成為全局)不會(huì)生成任何消息,它只是鎖定.
Finally, the 0 fitness approach (make it global) doesn't generate any messages, it just locks up.
import multiprocessing as mp
import time
q = mp.Queue()
def f(x):
q.put(str(x))
return x*x
def main():
pool = mp.Pool()
results = pool.imap_unordered(f, range(1, 6))
time.sleep(1)
print q.get()
pool.close()
pool.join()
if __name__ == '__main__':
main()
我知道它可能會(huì)直接與 multiprocessing.Process 一起使用,并且還有其他庫可以實(shí)現(xiàn)這一點(diǎn),但我不想放棄非常適合的標(biāo)準(zhǔn)庫函數(shù),直到我確定它不是只是我缺乏知識(shí)使我無法利用它們.
I'm aware that it will probably work with multiprocessing.Process directly and that there are other libraries to accomplish this, but I hate to back away from the standard library functions that are a great fit until I'm sure it's not just my lack of knowledge keeping me from being able to exploit them.
謝謝.
推薦答案
訣竅是將 Queue 作為參數(shù)傳遞給初始化程序.似乎適用于所有 Pool 調(diào)度方法.
The trick is to pass the Queue as an argument to the initializer. Appears to work with all the Pool dispatch methods.
import multiprocessing as mp
def f(x):
f.q.put('Doing: ' + str(x))
return x*x
def f_init(q):
f.q = q
def main():
jobs = range(1,6)
q = mp.Queue()
p = mp.Pool(None, f_init, [q])
results = p.imap(f, jobs)
p.close()
for i in range(len(jobs)):
print q.get()
print results.next()
if __name__ == '__main__':
main()
這篇關(guān)于我可以在 Pool.imap 調(diào)用的函數(shù)中使用多處理隊(duì)列嗎?的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!