問(wèn)題描述
我有一個(gè)腳本,其中包括從列表中打開(kāi)一個(gè)文件,然后對(duì)該文件中的文本執(zhí)行某些操作.我正在使用 python 多處理和 Pool 來(lái)嘗試并行化此操作.腳本的抽象如下:
I have a script that includes opening a file from a list and then doing something to the text within that file. I'm using python multiprocessing and Pool to try to parallelize this operation. A abstraction of the script is below:
import os
from multiprocessing import Pool
results = []
def testFunc(files):
for file in files:
print "Working in Process #%d" % (os.getpid())
#This is just an illustration of some logic. This is not what I'm actually doing.
for line in file:
if 'dog' in line:
results.append(line)
if __name__=="__main__":
p = Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
results = p.apply_async(testFunc, args = (files,))
results2 = results.get()
當(dāng)我運(yùn)行此程序時(shí),每次迭代的進(jìn)程 ID 打印輸出都是相同的.基本上我要做的是獲取輸入列表的每個(gè)元素并將其分叉到一個(gè)單獨(dú)的進(jìn)程中,但似乎一個(gè)進(jìn)程正在完成所有工作.
When I run this the print out of the process id is the same for each iteration. Basically what I'm trying to do is take each element of the input list and fork it out to a separate process, but it seems like one process is doing all of the work.
推薦答案
apply_async
將一項(xiàng)任務(wù)分配給池.你需要打電話apply_async
多次以鍛煉更多處理器.- 不要讓兩個(gè)進(jìn)程都嘗試寫(xiě)入同一個(gè)列表,
結(jié)果
.由于池工作者是獨(dú)立的進(jìn)程,這兩個(gè)不會(huì)寫(xiě)入同一個(gè)列表.解決此問(wèn)題的一種方法是使用輸出隊(duì)列.您可以自己設(shè)置,或使用apply_async
的回調(diào)為您設(shè)置隊(duì)列.apply_async
將在函數(shù)完成后調(diào)用回調(diào). - 你可以使用
map_async
代替apply_async
,但是你會(huì)獲取列表列表,然后您必須將其展平. apply_async
farms out one task to the pool. You would need to callapply_async
many times to exercise more processors.- Don't allow both processes to try to write to the same list,
results
. Since the pool workers are separate processes, the two won't be writing to the same list. One way to work around this is to use an ouput Queue. You could set it up yourself, or useapply_async
's callback to setup the Queue for you.apply_async
will call the callback once the function completes. - You could use
map_async
instead ofapply_async
, but then you'd get a list of lists, which you'd then have to flatten.
所以,不妨試試類似的方法:
So, perhaps try instead something like:
import os
import multiprocessing as mp
results = []
def testFunc(file):
result = []
print "Working in Process #%d" % (os.getpid())
# This is just an illustration of some logic. This is not what I'm
# actually doing.
with open(file, 'r') as f:
for line in f:
if 'dog' in line:
result.append(line)
return result
def collect_results(result):
results.extend(result)
if __name__ == "__main__":
p = mp.Pool(processes=2)
files = ['/path/to/file1.txt', '/path/to/file2.txt']
for f in files:
p.apply_async(testFunc, args=(f, ), callback=collect_results)
p.close()
p.join()
print(results)
這篇關(guān)于python multiprocessing apply_async 只使用一個(gè)進(jìn)程的文章就介紹到這了,希望我們推薦的答案對(duì)大家有所幫助,也希望大家多多支持html5模板網(wǎng)!